Skip to content

Commit a1ec913

Browse files
authored
Changed CCR internal integration tests to use a leader and follower cluster instead of a single cluster (elastic#34344)
The `AutoFollowTests` needs to restart the clusters between each tests, because it is using auto follow stats in assertions. Auto follow stats are only reset by stopping the elected master node. Extracted the `testGetOperationsBasedOnGlobalSequenceId()` test to its own test, because it just tests the shard changes api. * Renamed AutoFollowTests to AutoFollowIT, because it is an integration test. Renamed ShardChangesIT to IndexFollowingIT, because shard changes it the name of an internal api and isn't a good name for an integration test. * move creation of NodeConfigurationSource to a seperate method * Fixes issues after merge, moved assertSeqNos() and assertSameDocIdsOnShards() methods from ESIntegTestCase to InternalTestCluster, so that ccr tests can use these methods too.
1 parent 67e7464 commit a1ec913

File tree

10 files changed

+712
-427
lines changed

10 files changed

+712
-427
lines changed

server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ protected void beforeIndexDeletion() throws Exception {
111111
if (disableBeforeIndexDeletion == false) {
112112
super.beforeIndexDeletion();
113113
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
114-
assertSeqNos();
115-
assertSameDocIdsOnShards();
114+
internalCluster().assertSeqNos();
115+
internalCluster().assertSameDocIdsOnShards();
116116
}
117117
}
118118

server/src/test/java/org/elasticsearch/recovery/RelocationIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
102102
@Override
103103
protected void beforeIndexDeletion() throws Exception {
104104
super.beforeIndexDeletion();
105-
assertSeqNos();
106-
assertSameDocIdsOnShards();
105+
internalCluster().assertSeqNos();
106+
internalCluster().assertSameDocIdsOnShards();
107107
}
108108

109109
public void testSimpleRelocationNoIndexing() {

test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

Lines changed: 0 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,12 @@
1919

2020
package org.elasticsearch.test;
2121

22-
import com.carrotsearch.hppc.ObjectLongMap;
23-
import com.carrotsearch.hppc.cursors.IntObjectCursor;
24-
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2522
import com.carrotsearch.randomizedtesting.RandomizedContext;
2623
import com.carrotsearch.randomizedtesting.annotations.TestGroup;
2724
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
2825
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
2926
import org.apache.http.HttpHost;
3027
import org.apache.lucene.search.Sort;
31-
import org.apache.lucene.store.AlreadyClosedException;
3228
import org.apache.lucene.util.LuceneTestCase;
3329
import org.elasticsearch.ElasticsearchException;
3430
import org.elasticsearch.ExceptionsHelper;
@@ -78,7 +74,6 @@
7874
import org.elasticsearch.cluster.metadata.MappingMetaData;
7975
import org.elasticsearch.cluster.metadata.MetaData;
8076
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
81-
import org.elasticsearch.cluster.node.DiscoveryNode;
8277
import org.elasticsearch.cluster.routing.IndexRoutingTable;
8378
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
8479
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -125,15 +120,10 @@
125120
import org.elasticsearch.index.MergeSchedulerConfig;
126121
import org.elasticsearch.index.MockEngineFactoryPlugin;
127122
import org.elasticsearch.index.codec.CodecService;
128-
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
129123
import org.elasticsearch.index.engine.Segment;
130124
import org.elasticsearch.index.mapper.MappedFieldType;
131125
import org.elasticsearch.index.mapper.MapperService;
132126
import org.elasticsearch.index.mapper.MockFieldFilterPlugin;
133-
import org.elasticsearch.index.seqno.SeqNoStats;
134-
import org.elasticsearch.index.seqno.SequenceNumbers;
135-
import org.elasticsearch.index.shard.IndexShard;
136-
import org.elasticsearch.index.shard.IndexShardTestCase;
137127
import org.elasticsearch.index.translog.Translog;
138128
import org.elasticsearch.indices.IndicesQueryCache;
139129
import org.elasticsearch.indices.IndicesRequestCache;
@@ -197,7 +187,6 @@
197187
import java.util.concurrent.TimeUnit;
198188
import java.util.concurrent.atomic.AtomicInteger;
199189
import java.util.concurrent.atomic.AtomicLong;
200-
import java.util.function.BiFunction;
201190
import java.util.function.BooleanSupplier;
202191
import java.util.function.Function;
203192
import java.util.stream.Collectors;
@@ -221,7 +210,6 @@
221210
import static org.hamcrest.Matchers.equalTo;
222211
import static org.hamcrest.Matchers.is;
223212
import static org.hamcrest.Matchers.lessThanOrEqualTo;
224-
import static org.hamcrest.Matchers.not;
225213
import static org.hamcrest.Matchers.notNullValue;
226214
import static org.hamcrest.Matchers.startsWith;
227215

@@ -2359,108 +2347,6 @@ public static Index resolveIndex(String index) {
23592347
return new Index(index, uuid);
23602348
}
23612349

2362-
protected void assertSeqNos() throws Exception {
2363-
final BiFunction<ClusterState, ShardRouting, IndexShard> getInstanceShardInstance = (clusterState, shardRouting) -> {
2364-
if (shardRouting.assignedToNode() == false) {
2365-
return null;
2366-
}
2367-
final DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
2368-
if (assignedNode == null) {
2369-
return null;
2370-
}
2371-
return internalCluster().getInstance(IndicesService.class, assignedNode.getName()).getShardOrNull(shardRouting.shardId());
2372-
};
2373-
assertBusy(() -> {
2374-
final ClusterState state = clusterService().state();
2375-
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
2376-
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
2377-
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
2378-
if (primaryShardRouting == null) {
2379-
continue;
2380-
}
2381-
final IndexShard primaryShard = getInstanceShardInstance.apply(state, primaryShardRouting);
2382-
if (primaryShard == null) {
2383-
continue; //just ignore - shard movement
2384-
}
2385-
final SeqNoStats primarySeqNoStats;
2386-
final ObjectLongMap<String> syncGlobalCheckpoints;
2387-
try {
2388-
primarySeqNoStats = primaryShard.seqNoStats();
2389-
syncGlobalCheckpoints = primaryShard.getInSyncGlobalCheckpoints();
2390-
} catch (AlreadyClosedException ex) {
2391-
continue; // shard is closed - just ignore
2392-
}
2393-
assertThat(primaryShardRouting + " should have set the global checkpoint",
2394-
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
2395-
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
2396-
final IndexShard replicaShard = getInstanceShardInstance.apply(state, replicaShardRouting);
2397-
if (replicaShard == null) {
2398-
continue; //just ignore - shard movement
2399-
}
2400-
final SeqNoStats seqNoStats;
2401-
try {
2402-
seqNoStats = replicaShard.seqNoStats();
2403-
} catch (AlreadyClosedException e) {
2404-
continue; // shard is closed - just ignore
2405-
}
2406-
assertThat(replicaShardRouting + " local checkpoint mismatch",
2407-
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
2408-
assertThat(replicaShardRouting + " global checkpoint mismatch",
2409-
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
2410-
assertThat(replicaShardRouting + " max seq no mismatch",
2411-
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
2412-
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
2413-
assertThat(replicaShardRouting + " global checkpoint syncs mismatch", seqNoStats.getGlobalCheckpoint(),
2414-
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId())));
2415-
}
2416-
}
2417-
}
2418-
});
2419-
}
2420-
2421-
/**
2422-
* Asserts that all shards with the same shardId should have document Ids.
2423-
*/
2424-
public void assertSameDocIdsOnShards() throws Exception {
2425-
assertBusy(() -> {
2426-
ClusterState state = client().admin().cluster().prepareState().get().getState();
2427-
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
2428-
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
2429-
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
2430-
if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) {
2431-
continue;
2432-
}
2433-
DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId());
2434-
IndexShard primaryShard = internalCluster().getInstance(IndicesService.class, primaryNode.getName())
2435-
.indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id());
2436-
final List<DocIdSeqNoAndTerm> docsOnPrimary;
2437-
try {
2438-
docsOnPrimary = IndexShardTestCase.getDocIdAndSeqNos(primaryShard);
2439-
} catch (AlreadyClosedException ex) {
2440-
continue;
2441-
}
2442-
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
2443-
if (replicaShardRouting.assignedToNode() == false) {
2444-
continue;
2445-
}
2446-
DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId());
2447-
IndexShard replicaShard = internalCluster().getInstance(IndicesService.class, replicaNode.getName())
2448-
.indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id());
2449-
final List<DocIdSeqNoAndTerm> docsOnReplica;
2450-
try {
2451-
docsOnReplica = IndexShardTestCase.getDocIdAndSeqNos(replicaShard);
2452-
} catch (AlreadyClosedException ex) {
2453-
continue;
2454-
}
2455-
assertThat("out of sync shards: primary=[" + primaryShardRouting + "] num_docs_on_primary=[" + docsOnPrimary.size()
2456-
+ "] vs replica=[" + replicaShardRouting + "] num_docs_on_replica=[" + docsOnReplica.size() + "]",
2457-
docsOnReplica, equalTo(docsOnPrimary));
2458-
}
2459-
}
2460-
}
2461-
});
2462-
}
2463-
24642350
public static boolean inFipsJvm() {
24652351
return Security.getProviders()[0].getName().toLowerCase(Locale.ROOT).contains("fips");
24662352
}

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919
package org.elasticsearch.test;
2020

21+
import com.carrotsearch.hppc.ObjectLongMap;
22+
import com.carrotsearch.hppc.cursors.IntObjectCursor;
23+
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2124
import com.carrotsearch.randomizedtesting.RandomizedTest;
2225
import com.carrotsearch.randomizedtesting.SeedUtils;
2326
import com.carrotsearch.randomizedtesting.SysGlobals;
@@ -40,6 +43,8 @@
4043
import org.elasticsearch.cluster.node.DiscoveryNode;
4144
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
4245
import org.elasticsearch.cluster.node.DiscoveryNodes;
46+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
47+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
4348
import org.elasticsearch.cluster.routing.OperationRouting;
4449
import org.elasticsearch.cluster.routing.ShardRouting;
4550
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
@@ -76,8 +81,11 @@
7681
import org.elasticsearch.index.Index;
7782
import org.elasticsearch.index.IndexService;
7883
import org.elasticsearch.index.engine.CommitStats;
84+
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
7985
import org.elasticsearch.index.engine.Engine;
8086
import org.elasticsearch.index.engine.InternalEngine;
87+
import org.elasticsearch.index.seqno.SeqNoStats;
88+
import org.elasticsearch.index.seqno.SequenceNumbers;
8189
import org.elasticsearch.index.shard.IndexShard;
8290
import org.elasticsearch.index.shard.IndexShardTestCase;
8391
import org.elasticsearch.index.shard.ShardId;
@@ -127,6 +135,7 @@
127135
import java.util.concurrent.TimeUnit;
128136
import java.util.concurrent.atomic.AtomicBoolean;
129137
import java.util.concurrent.atomic.AtomicInteger;
138+
import java.util.function.BiFunction;
130139
import java.util.function.Function;
131140
import java.util.function.Predicate;
132141
import java.util.stream.Collectors;
@@ -148,6 +157,7 @@
148157
import static org.hamcrest.Matchers.equalTo;
149158
import static org.hamcrest.Matchers.greaterThan;
150159
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
160+
import static org.hamcrest.Matchers.not;
151161
import static org.junit.Assert.assertThat;
152162
import static org.junit.Assert.fail;
153163

@@ -1273,6 +1283,108 @@ public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOExce
12731283
}
12741284
}
12751285

1286+
public void assertSeqNos() throws Exception {
1287+
final BiFunction<ClusterState, ShardRouting, IndexShard> getInstanceShardInstance = (clusterState, shardRouting) -> {
1288+
if (shardRouting.assignedToNode() == false) {
1289+
return null;
1290+
}
1291+
final DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
1292+
if (assignedNode == null) {
1293+
return null;
1294+
}
1295+
return getInstance(IndicesService.class, assignedNode.getName()).getShardOrNull(shardRouting.shardId());
1296+
};
1297+
assertBusy(() -> {
1298+
final ClusterState state = clusterService().state();
1299+
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
1300+
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
1301+
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
1302+
if (primaryShardRouting == null) {
1303+
continue;
1304+
}
1305+
final IndexShard primaryShard = getInstanceShardInstance.apply(state, primaryShardRouting);
1306+
if (primaryShard == null) {
1307+
continue; //just ignore - shard movement
1308+
}
1309+
final SeqNoStats primarySeqNoStats;
1310+
final ObjectLongMap<String> syncGlobalCheckpoints;
1311+
try {
1312+
primarySeqNoStats = primaryShard.seqNoStats();
1313+
syncGlobalCheckpoints = primaryShard.getInSyncGlobalCheckpoints();
1314+
} catch (AlreadyClosedException ex) {
1315+
continue; // shard is closed - just ignore
1316+
}
1317+
assertThat(primaryShardRouting + " should have set the global checkpoint",
1318+
primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)));
1319+
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
1320+
final IndexShard replicaShard = getInstanceShardInstance.apply(state, replicaShardRouting);
1321+
if (replicaShard == null) {
1322+
continue; //just ignore - shard movement
1323+
}
1324+
final SeqNoStats seqNoStats;
1325+
try {
1326+
seqNoStats = replicaShard.seqNoStats();
1327+
} catch (AlreadyClosedException e) {
1328+
continue; // shard is closed - just ignore
1329+
}
1330+
assertThat(replicaShardRouting + " local checkpoint mismatch",
1331+
seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint()));
1332+
assertThat(replicaShardRouting + " global checkpoint mismatch",
1333+
seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint()));
1334+
assertThat(replicaShardRouting + " max seq no mismatch",
1335+
seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo()));
1336+
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
1337+
assertThat(replicaShardRouting + " global checkpoint syncs mismatch", seqNoStats.getGlobalCheckpoint(),
1338+
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId())));
1339+
}
1340+
}
1341+
}
1342+
});
1343+
}
1344+
1345+
/**
1346+
* Asserts that all shards with the same shardId should have document Ids.
1347+
*/
1348+
public void assertSameDocIdsOnShards() throws Exception {
1349+
assertBusy(() -> {
1350+
ClusterState state = client().admin().cluster().prepareState().get().getState();
1351+
for (ObjectObjectCursor<String, IndexRoutingTable> indexRoutingTable : state.routingTable().indicesRouting()) {
1352+
for (IntObjectCursor<IndexShardRoutingTable> indexShardRoutingTable : indexRoutingTable.value.shards()) {
1353+
ShardRouting primaryShardRouting = indexShardRoutingTable.value.primaryShard();
1354+
if (primaryShardRouting == null || primaryShardRouting.assignedToNode() == false) {
1355+
continue;
1356+
}
1357+
DiscoveryNode primaryNode = state.nodes().get(primaryShardRouting.currentNodeId());
1358+
IndexShard primaryShard = getInstance(IndicesService.class, primaryNode.getName())
1359+
.indexServiceSafe(primaryShardRouting.index()).getShard(primaryShardRouting.id());
1360+
final List<DocIdSeqNoAndTerm> docsOnPrimary;
1361+
try {
1362+
docsOnPrimary = IndexShardTestCase.getDocIdAndSeqNos(primaryShard);
1363+
} catch (AlreadyClosedException ex) {
1364+
continue;
1365+
}
1366+
for (ShardRouting replicaShardRouting : indexShardRoutingTable.value.replicaShards()) {
1367+
if (replicaShardRouting.assignedToNode() == false) {
1368+
continue;
1369+
}
1370+
DiscoveryNode replicaNode = state.nodes().get(replicaShardRouting.currentNodeId());
1371+
IndexShard replicaShard = getInstance(IndicesService.class, replicaNode.getName())
1372+
.indexServiceSafe(replicaShardRouting.index()).getShard(replicaShardRouting.id());
1373+
final List<DocIdSeqNoAndTerm> docsOnReplica;
1374+
try {
1375+
docsOnReplica = IndexShardTestCase.getDocIdAndSeqNos(replicaShard);
1376+
} catch (AlreadyClosedException ex) {
1377+
continue;
1378+
}
1379+
assertThat("out of sync shards: primary=[" + primaryShardRouting + "] num_docs_on_primary=[" + docsOnPrimary.size()
1380+
+ "] vs replica=[" + replicaShardRouting + "] num_docs_on_replica=[" + docsOnReplica.size() + "]",
1381+
docsOnReplica, equalTo(docsOnPrimary));
1382+
}
1383+
}
1384+
}
1385+
});
1386+
}
1387+
12761388
private void randomlyResetClients() throws IOException {
12771389
// only reset the clients on nightly tests, it causes heavy load...
12781390
if (RandomizedTest.isNightly() && rarely(random)) {

0 commit comments

Comments
 (0)