statements) {
@Override
synchronized public void clear(boolean inferred, Resource[] contexts) {
- BulkByScrollResponse response = new DeleteByQueryRequestBuilder(clientProvider.getClient(),
- DeleteByQueryAction.INSTANCE)
- .filter(getQueryBuilder(null, null, null, inferred, contexts))
- .abortOnVersionConflict(false)
- .source(index)
- .get();
+ DeleteByQueryRequest.Builder builder = new DeleteByQueryRequest.Builder();
+ DeleteByQueryRequest build = builder.index(index)
+ .query(getQuery(null, null, null, inferred, contexts))
+ .conflicts(Conflicts.Proceed)
+ .build();
+
+ try {
+ Long deleted = clientProvider.getClient().deleteByQuery(build).deleted();
+ } catch (IOException e) {
+ throw new SailException(e);
+ }
- long deleted = response.getDeleted();
}
@Override
@@ -166,7 +164,7 @@ public CloseableIteration extends ExtensibleStatement> getStatements(Resource
IRI predicate,
Value object, boolean inferred, Resource... context) {
- QueryBuilder queryBuilder = getQueryBuilder(subject, predicate, object, inferred, context);
+ Query queryBuilder = getQuery(subject, predicate, object, inferred, context);
return new LookAheadIteration<>() {
@@ -221,73 +219,64 @@ protected void handleClose() throws SailException {
}
- private QueryBuilder getQueryBuilder(Resource subject, IRI predicate, Value object, boolean inferred,
- Resource[] contexts) {
+ private Query getQuery(Resource subject, IRI predicate, Value object, boolean inferred, Resource[] contexts) {
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+ BoolQuery.Builder mainQuery = new BoolQuery.Builder();
if (subject != null) {
- boolQueryBuilder.must(QueryBuilders.termQuery("subject", subject.stringValue()));
+ mainQuery.must(b -> b.term(t -> t.field("subject").value(subject.stringValue())));
+
if (subject instanceof IRI) {
- boolQueryBuilder.must(QueryBuilders.termQuery("subject_IRI", true));
+ mainQuery.must(b -> b.term(t -> t.field("subject_IRI").value(true)));
} else {
- boolQueryBuilder.must(QueryBuilders.termQuery("subject_BNode", true));
+ mainQuery.must(b -> b.term(t -> t.field("subject_BNode").value(true)));
}
}
if (predicate != null) {
- boolQueryBuilder.must(QueryBuilders.termQuery("predicate", predicate.stringValue()));
+ mainQuery.must(b -> b.term(t -> t.field("predicate").value(predicate.stringValue())));
}
if (object != null) {
- boolQueryBuilder.must(QueryBuilders.termQuery("object_Hash", object.stringValue().hashCode()));
+ mainQuery.must(b -> b.term(t -> t.field("object_Hash").value(object.stringValue().hashCode())));
+
if (object instanceof IRI) {
- boolQueryBuilder.must(QueryBuilders.termQuery("object_IRI", true));
+ mainQuery.must(b -> b.term(t -> t.field("object_IRI").value(true)));
} else if (object instanceof BNode) {
- boolQueryBuilder.must(QueryBuilders.termQuery("object_BNode", true));
+ mainQuery.must(b -> b.term(t -> t.field("object_BNode").value(true)));
} else {
- boolQueryBuilder.must(
- QueryBuilders.termQuery("object_Datatype", ((Literal) object).getDatatype().stringValue()));
+ mainQuery.must(b -> b
+ .term(t -> t.field("object_Datatype").value(((Literal) object).getDatatype().stringValue())));
+
if (((Literal) object).getLanguage().isPresent()) {
- boolQueryBuilder
- .must(QueryBuilders.termQuery("object_Lang", ((Literal) object).getLanguage().get()));
+ mainQuery.must(
+ b -> b.term(t -> t.field("object_Lang").value(((Literal) object).getLanguage().get())));
}
}
}
if (contexts != null && contexts.length > 0) {
- BoolQueryBuilder contextQueryBuilder = new BoolQueryBuilder();
-
for (Resource context : contexts) {
-
if (context == null) {
-
- contextQueryBuilder.should(new BoolQueryBuilder().mustNot(QueryBuilders.existsQuery("context")));
-
+ mainQuery.should(b -> b.bool(bb -> bb.mustNot(mb -> mb.exists(a -> a.field("context")))));
} else if (context instanceof IRI) {
-
- contextQueryBuilder.should(
- new BoolQueryBuilder()
- .must(QueryBuilders.termQuery("context", context.stringValue()))
- .must(QueryBuilders.termQuery("context_IRI", true)));
-
+ mainQuery.should(b -> b.bool(bb -> {
+ bb.must(mb -> mb.term(t -> t.field("context").value(context.stringValue())));
+ bb.must(mb -> mb.term(t -> t.field("context_IRI").value(true)));
+ }));
} else { // BNode
- contextQueryBuilder.should(
- new BoolQueryBuilder()
- .must(QueryBuilders.termQuery("context", context.stringValue()))
- .must(QueryBuilders.termQuery("context_BNode", true)));
+ mainQuery.should(b -> b.bool(bb -> {
+ bb.must(mb -> mb.term(t -> t.field("context").value(context.stringValue())));
+ bb.must(mb -> mb.term(t -> t.field("context_BNode").value(true)));
+ }));
}
-
}
-
- boolQueryBuilder.must(contextQueryBuilder);
-
}
- boolQueryBuilder.must(QueryBuilders.termQuery("inferred", inferred));
+ mainQuery.must(b -> b.term(t -> t.field("inferred").value(inferred)));
- return QueryBuilders.constantScoreQuery(boolQueryBuilder);
+ return mainQuery.build()._toQuery();
}
@Override
@@ -604,7 +593,7 @@ public synchronized boolean removeStatementsByQuery(Resource subj, IRI pred, Val
BulkByScrollResponse response = new DeleteByQueryRequestBuilder(clientProvider.getClient(),
DeleteByQueryAction.INSTANCE)
- .filter(getQueryBuilder(subj, pred, obj, inferred, contexts))
+ .filter(getQuery(subj, pred, obj, inferred, contexts))
.source(index)
.abortOnVersionConflict(false)
.get();
diff --git a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchStore.java b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchStore.java
index b21b5c29ff..a53897a121 100644
--- a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchStore.java
+++ b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/ElasticsearchStore.java
@@ -25,13 +25,12 @@
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.extensiblestore.ExtensibleStore;
import org.eclipse.rdf4j.sail.extensiblestore.valuefactory.ExtensibleStatementHelper;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch._types.HealthStatus;
+
/**
*
* An RDF4J SailStore persisted to Elasticsearch.
@@ -49,9 +48,8 @@
* There is no write-ahead logging, so a failure during a transaction may result in partially persisted changes.
*
*
- * @see Elastic License FAQ
- *
* @author Håvard Mikkelsen Ottestad
+ * @see Elastic License FAQ
*/
@Experimental
public class ElasticsearchStore extends ExtensibleStore {
@@ -101,11 +99,11 @@ public ElasticsearchStore(ClientProvider clientPool, String index, Cache cache)
}
- public ElasticsearchStore(Client client, String index) {
+ public ElasticsearchStore(ElasticsearchClient client, String index) {
this(client, index, Cache.EAGER);
}
- public ElasticsearchStore(Client client, String index, Cache cache) {
+ public ElasticsearchStore(ElasticsearchClient client, String index, Cache cache) {
this(new UnclosableClientProvider(new UserProvidedClientProvider(client)), index, cache);
}
@@ -152,16 +150,10 @@ public void waitForElasticsearch(int time, TemporalUnit timeUnit) {
}
try {
- Client client = clientProvider.getClient();
-
- ClusterHealthResponse clusterHealthResponse = client.admin()
- .cluster()
- .health(new ClusterHealthRequest())
- .actionGet();
- ClusterHealthStatus status = clusterHealthResponse.getStatus();
- logger.info("Cluster status: {}", status.name());
+ ElasticsearchClient client = clientProvider.getClient();
- if (status.equals(ClusterHealthStatus.GREEN) || status.equals(ClusterHealthStatus.YELLOW)) {
+ HealthStatus status = client.cluster().health().status();
+ if (status.equals(HealthStatus.Green) || status.equals(HealthStatus.Yellow)) {
logger.info("Elasticsearch started!");
return;
diff --git a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/UserProvidedClientProvider.java b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/UserProvidedClientProvider.java
index a41e48e8ad..44019c40ac 100644
--- a/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/UserProvidedClientProvider.java
+++ b/core/sail/elasticsearch-store/src/main/java/org/eclipse/rdf4j/sail/elasticsearchstore/UserProvidedClientProvider.java
@@ -10,7 +10,7 @@
*******************************************************************************/
package org.eclipse.rdf4j.sail.elasticsearchstore;
-import org.elasticsearch.client.Client;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
/**
* Used by the user to provide an Elasticsearch Client to the ElasticsearchStore instead of providing host, port,
@@ -20,16 +20,16 @@
*/
public class UserProvidedClientProvider implements ClientProvider {
- final private Client client;
+ final private ElasticsearchClient client;
transient boolean closed;
- public UserProvidedClientProvider(Client client) {
+ public UserProvidedClientProvider(ElasticsearchClient client) {
this.client = client;
}
@Override
- public Client getClient() {
+ public ElasticsearchClient getClient() {
return client;
}
diff --git a/core/sail/elasticsearch/pom.xml b/core/sail/elasticsearch/pom.xml
index b8c924bcf9..14aa359e90 100644
--- a/core/sail/elasticsearch/pom.xml
+++ b/core/sail/elasticsearch/pom.xml
@@ -16,20 +16,9 @@
${project.version}