diff --git a/buildSrc/version.properties b/buildSrc/version.properties index 5f76b232ecb30..9c6a3b9691e7b 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -27,7 +27,7 @@ httpcore = 4.4.5 httpasyncclient = 4.1.2 commonslogging = 1.1.3 commonscodec = 1.10 -hamcrest = 1.3 +hamcrest = 2.0.0.0 securemock = 1.2 # When updating mocksocket, please also update server/src/main/resources/org/elasticsearch/bootstrap/test-framework.policy mocksocket = 1.2 diff --git a/client/rest-high-level/build.gradle b/client/rest-high-level/build.gradle index bfe7c3d956cd2..367f7fc609cee 100644 --- a/client/rest-high-level/build.gradle +++ b/client/rest-high-level/build.gradle @@ -57,7 +57,7 @@ dependencies { testCompile "org.elasticsearch.test:framework:${version}" testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" - testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}" //this is needed to make RestHighLevelClientTests#testApiNamingConventions work from IDEs testCompile "org.elasticsearch:rest-api-spec:${version}" } @@ -70,6 +70,8 @@ dependencyLicenses { } } +compileTestJava.options.compilerArgs << "-Xlint:-unchecked" + forbiddenApisMain { // core does not depend on the httpclient for compile so we add the signatures here. We don't add them for test as they are already // specified diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 57e63791fd30f..fbc059a375233 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -117,7 +117,7 @@ static Request bulk(BulkRequest bulkRequest) throws IOException { Params parameters = new Params(request); parameters.withTimeout(bulkRequest.timeout()); parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy()); - + parameters.withPipeline(bulkRequest.pipeline()); // Bulk API only supports newline delimited JSON or Smile. Before executing // the bulk, we need to check that all requests have the same content-type // and this content-type is supported by the Bulk API. @@ -237,6 +237,13 @@ static Request bulk(BulkRequest bulkRequest) throws IOException { return request; } + private static String orDefaultToGlobal(String value, String globalDefault) { + if(Strings.isNullOrEmpty(value)){ + return globalDefault; + } + return value; + } + static Request exists(GetRequest getRequest) { return getStyleRequest(HttpHead.METHOD_NAME, getRequest); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java index fdd5634ddd6bd..02654db9f6a7b 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java @@ -28,14 +28,18 @@ import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.search.SearchHit; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import java.io.IOException; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -44,10 +48,19 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - +import java.util.stream.IntStream; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.fieldFromSource; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType; import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; @@ -57,7 +70,7 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase { private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) { return BulkProcessor.builder( - (request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); + (request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); } public void testThatBulkProcessorCountIsCorrect() throws Exception { @@ -66,10 +79,10 @@ public void testThatBulkProcessorCountIsCorrect() throws Exception { int numDocs = randomIntBetween(10, 100); try (BulkProcessor processor = initBulkProcessorBuilder(listener) - //let's make sure that the bulk action limit trips, one single execution will index all the documents - .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) - .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) - .build()) { + //let's make sure that the bulk action limit trips, one single execution will index all the documents + .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) + .build()) { MultiGetRequest multiGetRequest = indexDocs(processor, numDocs); @@ -90,9 +103,9 @@ public void testBulkProcessorFlush() throws Exception { int numDocs = randomIntBetween(10, 100); try (BulkProcessor processor = initBulkProcessorBuilder(listener) - //let's make sure that this bulk won't be automatically flushed - .setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100)) - .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { + //let's make sure that this bulk won't be automatically flushed + .setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100)) + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { MultiGetRequest multiGetRequest = indexDocs(processor, numDocs); @@ -125,9 +138,9 @@ public void testBulkProcessorConcurrentRequests() throws Exception { MultiGetRequest multiGetRequest; try (BulkProcessor processor = initBulkProcessorBuilder(listener) - .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) - //set interval and size to high values - .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { + .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) + //set interval and size to high values + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { multiGetRequest = indexDocs(processor, numDocs); @@ -165,11 +178,11 @@ public void testBulkProcessorWaitOnClose() throws Exception { int numDocs = randomIntBetween(10, 100); BulkProcessor processor = initBulkProcessorBuilder(listener) - //let's make sure that the bulk action limit trips, one single execution will index all the documents - .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) - .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), - RandomPicks.randomFrom(random(), ByteSizeUnit.values()))) - .build(); + //let's make sure that the bulk action limit trips, one single execution will index all the documents + .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), + RandomPicks.randomFrom(random(), ByteSizeUnit.values()))) + .build(); MultiGetRequest multiGetRequest = indexDocs(processor, numDocs); assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true)); @@ -220,20 +233,20 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); try (BulkProcessor processor = initBulkProcessorBuilder(listener) - .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) - //set interval and size to high values - .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { + .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) + //set interval and size to high values + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { for (int i = 1; i <= numDocs; i++) { if (randomBoolean()) { testDocs++; processor.add(new IndexRequest("test", "test", Integer.toString(testDocs)) - .source(XContentType.JSON, "field", "value")); + .source(XContentType.JSON, "field", "value")); multiGetRequest.add("test", "test", Integer.toString(testDocs)); } else { testReadOnlyDocs++; processor.add(new IndexRequest("test-ro", "test", Integer.toString(testReadOnlyDocs)) - .source(XContentType.JSON, "field", "value")); + .source(XContentType.JSON, "field", "value")); } } } @@ -268,23 +281,86 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception assertMultiGetResponse(highLevelClient().mget(multiGetRequest, RequestOptions.DEFAULT), testDocs); } - private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception { + public void testGlobalParametersAndBulkProcessor() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); + createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ"); + + int numDocs = randomIntBetween(10, 10); + try (BulkProcessor processor = initBulkProcessorBuilder(listener) + //let's make sure that the bulk action limit trips, one single execution will index all the documents + .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) + .setDefaultIndex("test") + .setDefaultType("test") + .setDefaultRouting("routing") + .setDefaultPipeline("pipeline_id") + .build()) { + + indexDocs(processor, numDocs, null, null, "test", "test", "pipeline_id"); + latch.await(); + + assertThat(listener.beforeCounts.get(), equalTo(1)); + assertThat(listener.afterCounts.get(), equalTo(1)); + assertThat(listener.bulkFailures.size(), equalTo(0)); + assertResponseItems(listener.bulkItems, numDocs); + + Iterable hits = searchAll(new SearchRequest("test").routing("routing")); + + assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ")))); + assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType("test")))); + assertThat(hits, containsInAnyOrder(expectedIds(numDocs))); + } + } + + @SuppressWarnings("unchecked") + private Matcher[] expectedIds(int numDocs) { + return IntStream.rangeClosed(1, numDocs) + .boxed() + .map(n -> hasId(n.toString())) + .>toArray(Matcher[]::new); + } + + private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType, + String globalIndex, String globalType, String globalPipeline) throws Exception { MultiGetRequest multiGetRequest = new MultiGetRequest(); for (int i = 1; i <= numDocs; i++) { if (randomBoolean()) { - processor.add(new IndexRequest("test", "test", Integer.toString(i)) - .source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30))); + processor.add(new IndexRequest(localIndex, localType, Integer.toString(i)) + .source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30))); } else { - final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n" - + Strings.toString(JsonXContent.contentBuilder() - .startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject()) + "\n"; - processor.add(new BytesArray(source), null, null, XContentType.JSON); + BytesArray data = bytesBulkRequest(localIndex, localType, i); + processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON); } - multiGetRequest.add("test", "test", Integer.toString(i)); + multiGetRequest.add(localIndex, localType, Integer.toString(i)); } return multiGetRequest; } + private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException { + String action = Strings.toString(jsonBuilder() + .startObject() + .startObject("index") + .field("_index", localIndex) + .field("_type", localType) + .field("_id", Integer.toString(id)) + .endObject() + .endObject() + ); + String source = Strings.toString(jsonBuilder() + .startObject() + .field("field", randomRealisticUnicodeOfLengthBetween(1, 30)) + .endObject() + ); + + String request = action + "\n" + source + "\n"; + return new BytesArray(request); + } + + private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception { + return indexDocs(processor, numDocs, "test", "test", null, null, null); + } + private static void assertResponseItems(List bulkItemResponses, int numDocs) { assertThat(bulkItemResponses.size(), is(numDocs)); int i = 1; @@ -293,7 +369,7 @@ private static void assertResponseItems(List bulkItemResponses assertThat(bulkItemResponse.getType(), equalTo("test")); assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++))); assertThat("item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(), - bulkItemResponse.isFailed(), equalTo(false)); + bulkItemResponse.isFailed(), equalTo(false)); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java new file mode 100644 index 0000000000000..d7d1814a106d2 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkRequestWithGlobalParametersIT.java @@ -0,0 +1,211 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.SearchHit; + +import java.io.IOException; +import java.util.function.Function; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +public class BulkRequestWithGlobalParametersIT extends ESRestHighLevelClientTestCase { + + public void testGlobalPipelineOnBulkRequest() throws IOException { + createFieldAddingPipleine("xyz", "fieldNameXYZ", "valueXYZ"); + + BulkRequest request = new BulkRequest(); + request.pipeline("xyz"); + request.add(new IndexRequest("test", "doc", "1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest("test", "doc", "2") + .source(XContentType.JSON, "field", "bulk2")); + + bulk(request); + + Iterable hits = searchAll("test"); + assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2"))); + assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ")))); + } + + public void testPipelineOnRequestOverridesGlobalPipeline() throws IOException { + createFieldAddingPipleine("globalId", "fieldXYZ", "valueXYZ"); + createFieldAddingPipleine("perIndexId", "someNewField", "someValue"); + + BulkRequest request = new BulkRequest(); + request.pipeline("globalId"); + request.add(new IndexRequest("test", "doc", "1") + .source(XContentType.JSON, "field", "bulk1") + .setPipeline("perIndexId")); + request.add(new IndexRequest("test", "doc", "2") + .source(XContentType.JSON, "field", "bulk2") + .setPipeline("perIndexId")); + + bulk(request); + + Iterable hits = searchAll("test"); + assertThat(hits, everyItem(hasProperty(fieldFromSource("someNewField"), equalTo("someValue")))); + // global pipeline was not applied + assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldXYZ"), nullValue()))); + } + + public void testMixPipelineOnRequestAndGlobal() throws IOException { + createFieldAddingPipleine("globalId", "fieldXYZ", "valueXYZ"); + createFieldAddingPipleine("perIndexId", "someNewField", "someValue"); + + BulkRequest request = new BulkRequest(); + request.pipeline("globalId"); + + request.add(new IndexRequest("test", "doc", "1") + .source(XContentType.JSON, "field", "bulk1") + .setPipeline("perIndexId")); + + request.add(new IndexRequest("test", "doc", "2") + .source(XContentType.JSON, "field", "bulk2")); + + bulk(request); + + Iterable hits = searchAll("test"); + assertThat(hits, containsInAnyOrder( + both(hasId("1")) + .and(hasProperty(fieldFromSource("someNewField"), equalTo("someValue"))), + both(hasId("2")) + .and(hasProperty(fieldFromSource("fieldXYZ"), equalTo("valueXYZ"))))); + } + + public void testGlobalIndex() throws IOException { + BulkRequest request = new BulkRequest("global_index", null); + request.add(new IndexRequest().type("doc").id("1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest().type("doc").id("2") + .source(XContentType.JSON, "field", "bulk2")); + + bulk(request); + + Iterable hits = searchAll("global_index"); + assertThat(hits, everyItem(hasIndex("global_index"))); + } + + public void testIndexGlobalAndPerRequest() throws IOException { + BulkRequest request = new BulkRequest("global_index", null); + request.add(new IndexRequest("local_index", "doc", "1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest().type("doc").id("2") // will take global index + .source(XContentType.JSON, "field", "bulk2")); + + bulk(request); + + Iterable hits = searchAll("local_index", "global_index"); + assertThat(hits, containsInAnyOrder( + both(hasId("1")) + .and(hasIndex("local_index")), + both(hasId("2")) + .and(hasIndex("global_index")))); + } + + public void testGlobalType() throws IOException { + BulkRequest request = new BulkRequest(null, "global_type"); + request.add(new IndexRequest("index").id("1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest("index").id("2") + .source(XContentType.JSON, "field", "bulk2")); + + bulk(request); + + Iterable hits = searchAll("index"); + assertThat(hits, everyItem(hasType("global_type"))); + } + + public void testTypeGlobalAndPerRequest() throws IOException { + BulkRequest request = new BulkRequest(null, "global_type"); + request.add(new IndexRequest("index1", "local_type", "1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest("index2").id("2") // will take global type + .source(XContentType.JSON, "field", "bulk2")); + + bulk(request); + + Iterable hits = searchAll("index1", "index2"); + assertThat(hits, containsInAnyOrder( + both(hasId("1")) + .and(hasType("local_type")), + both(hasId("2")) + .and(hasType("global_type")))); + } + + + public void testGlobalRouting() throws IOException { + BulkRequest request = new BulkRequest(null, null); + request.routing("routing"); + request.add(new IndexRequest("index", "type", "1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest("index", "type", "2") + .source(XContentType.JSON, "field", "bulk1")); + + bulk(request); + + Iterable emptyHits = searchAll(new SearchRequest("index").routing("wrongRouting")); + assertThat(emptyHits, is(emptyIterable())); + + Iterable hits = searchAll(new SearchRequest("index").routing("routing")); + assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2"))); + } + + public void testMixLocalAndGlobalRouting() throws IOException { + BulkRequest request = new BulkRequest(null, null); + request.routing("globalRouting"); + request.add(new IndexRequest("index", "type", "1") + .source(XContentType.JSON, "field", "bulk1")); + request.add(new IndexRequest("index", "type", "2") + .routing("localRouting") + .source(XContentType.JSON, "field", "bulk1")); + + bulk(request); + + Iterable hits = searchAll(new SearchRequest("index").routing("globalRouting", "localRouting")); + assertThat(hits, containsInAnyOrder(hasId("1"), hasId("2"))); + } + + private BulkResponse bulk(BulkRequest request) throws IOException { + BulkResponse bulkResponse = execute(request, highLevelClient()::bulk, highLevelClient()::bulkAsync); + assertFalse(bulkResponse.hasFailures()); + return bulkResponse; + } + + @SuppressWarnings("unchecked") + private static Function fieldFromSource(String fieldName) { + return (response) -> (T) response.getSourceAsMap().get(fieldName); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java index 9217b0b4e5550..02bb5e1e52690 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ESRestHighLevelClientTestCase.java @@ -22,6 +22,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -29,15 +31,20 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.rest.ESRestTestCase; import org.junit.AfterClass; import org.junit.Before; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Base64; import java.util.Collections; import java.util.Objects; +import java.util.stream.Collectors; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; public abstract class ESRestHighLevelClientTestCase extends ESRestTestCase { @@ -124,6 +131,22 @@ protected static XContentBuilder buildRandomXContentPipeline() throws IOExceptio return buildRandomXContentPipeline(pipelineBuilder); } + protected static void createFieldAddingPipleine(String id, String fieldName, String value) throws IOException { + XContentBuilder pipeline = jsonBuilder() + .startObject() + .startArray("processors") + .startObject() + .startObject("set") + .field("field", fieldName) + .field("value", value) + .endObject() + .endObject() + .endArray() + .endObject(); + + createPipeline(new PutPipelineRequest(id, BytesReference.bytes(pipeline), XContentType.JSON)); + } + protected static void createPipeline(String pipelineId) throws IOException { XContentBuilder builder = buildRandomXContentPipeline(); createPipeline(new PutPipelineRequest(pipelineId, BytesReference.bytes(builder), builder.contentType())); @@ -153,4 +176,22 @@ protected Settings restClientSettings() { .put(ThreadContext.PREFIX + ".Authorization", token) .build(); } + + protected Iterable searchAll(String... indices) throws IOException { + SearchRequest searchRequest = new SearchRequest(indices); + return searchAll(searchRequest); + } + + protected Iterable searchAll(SearchRequest searchRequest) throws IOException { + refreshIndexes(searchRequest.indices()); + SearchResponse search = highLevelClient().search(searchRequest, RequestOptions.DEFAULT); + return search.getHits(); + } + + private void refreshIndexes(String... indices) throws IOException { + String joinedIndices = Arrays.stream(indices) + .collect(Collectors.joining(",")); + Response refreshResponse = client().performRequest(new Request("POST", "/" + joinedIndices + "/_refresh")); + assertEquals(200, refreshResponse.getStatusLine().getStatusCode()); + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 8887bed226ca1..8f1a69e18c409 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -99,6 +99,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.RandomObjects; +import org.hamcrest.Matchers; import java.io.IOException; import java.io.InputStream; @@ -859,6 +860,21 @@ public void testBulkWithDifferentContentTypes() throws IOException { } } + public void testGlobalPipelineOnBulkRequest() throws IOException { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.pipeline("xyz"); + bulkRequest.add(new IndexRequest("test", "doc", "11") + .source(XContentType.JSON, "field", "bulk1")); + bulkRequest.add(new IndexRequest("test", "doc", "12") + .source(XContentType.JSON, "field", "bulk2")); + bulkRequest.add(new IndexRequest("test", "doc", "13") + .source(XContentType.JSON, "field", "bulk3")); + + Request request = RequestConverters.bulk(bulkRequest); + + assertThat(request.getParameters(), Matchers.hasEntry("pipeline","xyz")); + } + public void testSearchNullSource() throws IOException { SearchRequest searchRequest = new SearchRequest(); Request request = RequestConverters.search(searchRequest); diff --git a/client/rest/build.gradle b/client/rest/build.gradle index 273836a31f0cb..bed841014ea48 100644 --- a/client/rest/build.gradle +++ b/client/rest/build.gradle @@ -47,7 +47,7 @@ dependencies { testCompile "org.elasticsearch.client:test:${version}" testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" - testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}" testCompile "org.elasticsearch:securemock:${versions.securemock}" testCompile "org.elasticsearch:mocksocket:${versions.mocksocket}" } diff --git a/client/sniffer/build.gradle b/client/sniffer/build.gradle index 6ba69c5713c57..23ae9b0d8b59d 100644 --- a/client/sniffer/build.gradle +++ b/client/sniffer/build.gradle @@ -45,7 +45,7 @@ dependencies { testCompile "org.elasticsearch.client:test:${version}" testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" - testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}" testCompile "org.elasticsearch:securemock:${versions.securemock}" testCompile "org.elasticsearch:mocksocket:${versions.mocksocket}" } diff --git a/client/test/build.gradle b/client/test/build.gradle index e66d2be57f1ea..82f511329831c 100644 --- a/client/test/build.gradle +++ b/client/test/build.gradle @@ -27,7 +27,7 @@ dependencies { compile "org.apache.httpcomponents:httpcore:${versions.httpcore}" compile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" compile "junit:junit:${versions.junit}" - compile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + compile "org.hamcrest:java-hamcrest:${versions.hamcrest}" } forbiddenApisMain { diff --git a/client/transport/build.gradle b/client/transport/build.gradle index 269a37105fb19..dacb341e73cb9 100644 --- a/client/transport/build.gradle +++ b/client/transport/build.gradle @@ -32,7 +32,7 @@ dependencies { compile "org.elasticsearch.plugin:rank-eval-client:${version}" testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" - testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}" } dependencyLicenses { diff --git a/distribution/tools/launchers/build.gradle b/distribution/tools/launchers/build.gradle index ca1aa6bcac9d6..5b17920308a9d 100644 --- a/distribution/tools/launchers/build.gradle +++ b/distribution/tools/launchers/build.gradle @@ -27,7 +27,7 @@ dependencies { compile parent.project('java-version-checker') testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" - testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}" } archivesBaseName = 'elasticsearch-launchers' diff --git a/libs/core/build.gradle b/libs/core/build.gradle index 9c90837bd80ed..f1d46f2641a9f 100644 --- a/libs/core/build.gradle +++ b/libs/core/build.gradle @@ -74,7 +74,7 @@ publishing { dependencies { testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" - testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}" if (!isEclipse && !isIdea) { java9Compile sourceSets.main.output diff --git a/libs/nio/build.gradle b/libs/nio/build.gradle index f6a6ff652450f..5e68edab599b1 100644 --- a/libs/nio/build.gradle +++ b/libs/nio/build.gradle @@ -34,7 +34,7 @@ dependencies { testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" - testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}" if (isEclipse == false || project.path == ":libs:nio-tests") { testCompile("org.elasticsearch.test:framework:${version}") { diff --git a/libs/secure-sm/build.gradle b/libs/secure-sm/build.gradle index 3baf3513b1206..bd317fd53f901 100644 --- a/libs/secure-sm/build.gradle +++ b/libs/secure-sm/build.gradle @@ -34,7 +34,7 @@ dependencies { testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" - testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}" if (isEclipse == false || project.path == ":libs:secure-sm-tests") { testCompile("org.elasticsearch.test:framework:${version}") { diff --git a/libs/x-content/build.gradle b/libs/x-content/build.gradle index 0ec4e0d6ad312..1270ef2c2b3be 100644 --- a/libs/x-content/build.gradle +++ b/libs/x-content/build.gradle @@ -42,7 +42,7 @@ dependencies { testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" - testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}" if (isEclipse == false || project.path == ":libs:x-content-tests") { testCompile("org.elasticsearch.test:framework:${version}") { diff --git a/qa/vagrant/build.gradle b/qa/vagrant/build.gradle index 4c3b48cbac946..d117bad81d7ff 100644 --- a/qa/vagrant/build.gradle +++ b/qa/vagrant/build.gradle @@ -26,9 +26,10 @@ plugins { dependencies { compile "junit:junit:${versions.junit}" - compile "org.hamcrest:hamcrest-core:${versions.hamcrest}" - compile "org.hamcrest:hamcrest-library:${versions.hamcrest}" - compile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" + compile "org.hamcrest:java-hamcrest:${versions.hamcrest}" +// compile "org.hamcrest:hamcrest-library:${versions.hamcrest}" + + compile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" compile "org.apache.httpcomponents:httpcore:${versions.httpcore}" compile "org.apache.httpcomponents:httpclient:${versions.httpclient}" diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index b0d553534e44d..e2d01aad230bd 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -35,12 +35,25 @@ */ public interface DocWriteRequest extends IndicesRequest { + /** + * Set the index for this request + * @return the Request + */ + T index(String index); + /** * Get the index that this request operates on * @return the index */ String index(); + + /** + * Set the type for this request + * @return the Request + */ + T type(String type); + /** * Get the type that this request operates on * @return the type diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index f8f9d154b14d6..d98376cb4ac0b 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; +import java.util.function.Supplier; /** * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request @@ -88,6 +89,10 @@ public static class Builder { private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB); private TimeValue flushInterval = null; private BackoffPolicy backoffPolicy = BackoffPolicy.exponentialBackoff(); + private String defaultIndex; + private String defaultType; + private String defaultRouting; + private String defaultPipeline; private Builder(BiConsumer> consumer, Listener listener, Scheduler scheduler, Runnable onClose) { @@ -136,6 +141,26 @@ public Builder setFlushInterval(TimeValue flushInterval) { return this; } + public Builder setDefaultIndex(String defaultIndex) { + this.defaultIndex = defaultIndex; + return this; + } + + public Builder setDefaultType(String defaultType) { + this.defaultType = defaultType; + return this; + } + + public Builder setDefaultRouting(String defaultRouting) { + this.defaultRouting = defaultRouting; + return this; + } + + public Builder setDefaultPipeline(String defaultPipeline) { + this.defaultPipeline = defaultPipeline; + return this; + } + /** * Sets a custom backoff policy. The backoff policy defines how the bulk processor should handle retries of bulk requests internally * in case they have failed due to resource constraints (i.e. a thread pool was full). @@ -156,8 +181,14 @@ public Builder setBackoffPolicy(BackoffPolicy backoffPolicy) { * Builds a new bulk processor. */ public BulkProcessor build() { - return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, bulkSize, flushInterval, - scheduler, onClose); + return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions, + bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithDefaults()); + } + + private Supplier createBulkRequestWithDefaults() { + return () -> new BulkRequest(defaultIndex, defaultType) + .pipeline(defaultPipeline) + .routing(defaultRouting); } } @@ -184,6 +215,7 @@ public static Builder builder(BiConsumer bulkRequestSupplier; private final BulkRequestHandler bulkRequestHandler; private final Runnable onClose; @@ -191,10 +223,11 @@ public static Builder builder(BiConsumer> consumer, BackoffPolicy backoffPolicy, Listener listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, - Scheduler scheduler, Runnable onClose) { + Scheduler scheduler, Runnable onClose, Supplier bulkRequestSupplier) { this.bulkActions = bulkActions; this.bulkSize = bulkSize.getBytes(); - this.bulkRequest = new BulkRequest(); + this.bulkRequest = bulkRequestSupplier.get(); + this.bulkRequestSupplier = bulkRequestSupplier; this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests); // Start period flushing task after everything is setup this.cancellableFlushTask = startFlushTask(flushInterval, scheduler); @@ -333,7 +366,7 @@ private void execute() { final BulkRequest bulkRequest = this.bulkRequest; final long executionId = executionIdGen.incrementAndGet(); - this.bulkRequest = new BulkRequest(); + this.bulkRequest = bulkRequestSupplier.get(); this.bulkRequestHandler.execute(bulkRequest, executionId); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 6698aa4b62ab5..7d1ec59a05ed7 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -90,12 +90,21 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; + private String defaultPipelineId; + private String defaultRouting; + private String defaultIndex; + private String defaultType; private long sizeInBytes = 0; public BulkRequest() { } + public BulkRequest(@Nullable String defaultIndex, @Nullable String defaultType) { + this.defaultIndex = defaultIndex; + this.defaultType = defaultType; + } + /** * Adds a list of requests to be executed. Either index or delete requests. */ @@ -154,6 +163,8 @@ public BulkRequest add(IndexRequest request, @Nullable Object payload) { BulkRequest internalAdd(IndexRequest request, @Nullable Object payload) { Objects.requireNonNull(request, "'request' must not be null"); + applyGlobalParameters(request); + requests.add(request); addPayload(payload); // lack of source is validated in validate() method @@ -175,6 +186,8 @@ public BulkRequest add(UpdateRequest request, @Nullable Object payload) { BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) { Objects.requireNonNull(request, "'request' must not be null"); + applyGlobalParameters(request); + requests.add(request); addPayload(payload); if (request.doc() != null) { @@ -199,6 +212,8 @@ public BulkRequest add(DeleteRequest request) { public BulkRequest add(DeleteRequest request, @Nullable Object payload) { Objects.requireNonNull(request, "'request' must not be null"); + applyGlobalParameters(request); + requests.add(request); addPayload(payload); sizeInBytes += REQUEST_OVERHEAD; @@ -500,6 +515,15 @@ public final BulkRequest timeout(TimeValue timeout) { return this; } + public final BulkRequest pipeline(String defaultPipelineId) { + this.defaultPipelineId = defaultPipelineId; + return this; + } + + public final BulkRequest routing(String defaultRouting){ + this.defaultRouting = defaultRouting; + return this; + } /** * A timeout to wait if the index operation can't be performed immediately. Defaults to {@code 1m}. */ @@ -511,6 +535,10 @@ public TimeValue timeout() { return timeout; } + public String pipeline() { + return defaultPipelineId; + } + private int findNextMarker(byte marker, int from, BytesReference data, int length) { for (int i = from; i < length; i++) { if (data.get(i) == marker) { @@ -576,4 +604,16 @@ public String getDescription() { return "requests[" + requests.size() + "], indices[" + Strings.collectionToDelimitedString(indices, ", ") + "]"; } + private void applyGlobalParameters(DocWriteRequest request) { + request.index(valueOrDefault(request.index(), defaultIndex)); + request.routing(valueOrDefault(request.routing(), defaultRouting)); + request.type(valueOrDefault(request.type(), defaultType)); + } + + private static String valueOrDefault(String value, String globalDefault) { + if (Strings.isNullOrEmpty(value)) { + return globalDefault; + } + return value; + } } diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java index 879e8e665cd44..41cfc1d40e829 100644 --- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java @@ -108,6 +108,7 @@ public String type() { /** * Sets the type of the document to delete. */ + @Override public DeleteRequest type(String type) { this.type = type; return this; diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 339880dad44bf..8e95d602e796c 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -211,6 +211,7 @@ public String type() { /** * Sets the type of the indexed document. */ + @Override public IndexRequest type(String type) { this.type = type; return this; diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index 3fbfa381ad352..0ff0152e893e0 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -72,19 +72,10 @@ public void testBulkProcessorFlushPreservesContext() throws InterruptedException try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) { threadPool.getThreadContext().putHeader(headerKey, headerValue); threadPool.getThreadContext().putTransient(transientKey, transientValue); - bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - } - }, 1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval, threadPool, () -> {}); + bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(), + 1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval, + threadPool, () -> { + }, BulkRequest::new); } assertNull(threadPool.getThreadContext().getHeader(headerKey)); assertNull(threadPool.getThreadContext().getTransient(transientKey)); @@ -100,28 +91,33 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) bulkProcessor.close(); } + public void testAwaitOnCloseCallsOnClose() throws Exception { final AtomicBoolean called = new AtomicBoolean(false); - BulkProcessor bulkProcessor = new BulkProcessor((request, listener) -> { - }, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() { + BiConsumer> consumer = (request, listener) -> { + }; + BulkProcessor bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), emptyListener(), + 0, 10, new ByteSizeValue(1000), null, + (delay, executor, command) -> null, () -> called.set(true), BulkRequest::new); + + assertFalse(called.get()); + bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS); + assertTrue(called.get()); + } + + private BulkProcessor.Listener emptyListener() { + return new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { - } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - } - }, 0, 10, new ByteSizeValue(1000), null, (delay, executor, command) -> null, () -> called.set(true)); - - assertFalse(called.get()); - bulkProcessor.awaitClose(100, TimeUnit.MILLISECONDS); - assertTrue(called.get()); + }; } } diff --git a/test/framework/build.gradle b/test/framework/build.gradle index 5e5c53f4406c9..8dc3a4ebb5438 100644 --- a/test/framework/build.gradle +++ b/test/framework/build.gradle @@ -24,7 +24,7 @@ dependencies { compile "org.elasticsearch:elasticsearch-cli:${version}" compile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" compile "junit:junit:${versions.junit}" - compile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + compile "org.hamcrest:java-hamcrest:${versions.hamcrest}" compile "org.apache.lucene:lucene-test-framework:${versions.lucene}" compile "org.apache.lucene:lucene-codecs:${versions.lucene}" compile "commons-logging:commons-logging:${versions.commonslogging}" diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 65ed746accacb..04630fe7e91f4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -58,6 +58,7 @@ import org.elasticsearch.test.NotEqualMessageBuilder; import org.hamcrest.CoreMatchers; import org.hamcrest.Matcher; +import org.hamcrest.core.CombinableMatcher; import java.io.IOException; import java.nio.file.Files; @@ -70,6 +71,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.function.Function; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -472,6 +474,14 @@ public static Matcher hasScore(final float score) { return new ElasticsearchMatchers.SearchHitHasScoreMatcher(score); } + public static CombinableMatcher hasProperty(Function property, Matcher valueMatcher) { + return ElasticsearchMatchers.HasPropertyLambdaMatcher.hasProperty(property, valueMatcher); + } + + public static Function fieldFromSource(String fieldName) { + return (response) -> (T) response.getSourceAsMap().get(fieldName); + } + public static T assertBooleanSubQuery(Query query, Class subqueryType, int i) { assertThat(query, instanceOf(BooleanQuery.class)); BooleanQuery q = (BooleanQuery) query; diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java index f49cc3bd39ee7..3332058648106 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchMatchers.java @@ -20,7 +20,12 @@ import org.elasticsearch.search.SearchHit; import org.hamcrest.Description; +import org.hamcrest.FeatureMatcher; +import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; +import org.hamcrest.core.CombinableMatcher; + +import java.util.function.Function; public class ElasticsearchMatchers { @@ -115,4 +120,27 @@ public void describeTo(final Description description) { description.appendText("searchHit score should be ").appendValue(score); } } + + public static class HasPropertyLambdaMatcher extends FeatureMatcher { + + private final Function property; + + private HasPropertyLambdaMatcher(Matcher subMatcher, Function property) { + super(subMatcher, "object with", "lambda"); + this.property = property; + } + + @Override + protected V featureValueOf(T actual) { + return property.apply(actual); + } + + /** + * @param valueMatcher The matcher to apply to the property + * @param property The lambda to fetch property + */ + public static CombinableMatcher hasProperty(Function property, Matcher valueMatcher) { + return new CombinableMatcher<>(new HasPropertyLambdaMatcher<>(valueMatcher, property)); + } + } } diff --git a/x-pack/plugin/security/cli/build.gradle b/x-pack/plugin/security/cli/build.gradle index 377d10ec7f203..76f1b9dc3436c 100644 --- a/x-pack/plugin/security/cli/build.gradle +++ b/x-pack/plugin/security/cli/build.gradle @@ -12,7 +12,7 @@ dependencies { compile 'org.bouncycastle:bcprov-jdk15on:1.59' testImplementation 'com.google.jimfs:jimfs:1.1' testCompile "junit:junit:${versions.junit}" - testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}" testCompile 'org.elasticsearch:securemock:1.2' testCompile "org.elasticsearch.test:framework:${version}" testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') diff --git a/x-pack/transport-client/build.gradle b/x-pack/transport-client/build.gradle index a96f4146fbf67..c1df555d953c3 100644 --- a/x-pack/transport-client/build.gradle +++ b/x-pack/transport-client/build.gradle @@ -12,7 +12,7 @@ dependencies { compile "org.elasticsearch.client:transport:${version}" testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" testCompile "junit:junit:${versions.junit}" - testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}" + testCompile "org.hamcrest:java-hamcrest:${versions.hamcrest}" } dependencyLicenses.enabled = false