Skip to content

Commit ec948b7

Browse files
Add reactive variant
...and refactor a bit
1 parent 7d85f56 commit ec948b7

17 files changed

+1301
-77
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoClientAware.java renamed to spring-data-mongodb/src/main/java/org/springframework/data/mongodb/MongoClusterCapable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
/**
1919
* @author Christoph Strobl
2020
*/
21-
public interface MongoClientAware<T> {
21+
public interface MongoClusterCapable<T> {
2222

23-
T getMongoClient();
23+
T getMongoCluster();
2424
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import java.util.List;
19+
20+
import com.mongodb.client.model.bulk.ClientDeleteOneOptions;
21+
import com.mongodb.client.model.bulk.ClientReplaceOneOptions;
22+
import org.bson.Document;
23+
24+
import com.mongodb.MongoNamespace;
25+
import com.mongodb.client.model.DeleteOptions;
26+
import com.mongodb.client.model.UpdateOptions;
27+
import com.mongodb.client.model.bulk.ClientDeleteManyOptions;
28+
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
29+
import com.mongodb.client.model.bulk.ClientUpdateManyOptions;
30+
import com.mongodb.client.model.bulk.ClientUpdateOneOptions;
31+
32+
/**
33+
* @author Christoph Strobl
34+
*/
35+
abstract class BulkWriteSupport {
36+
37+
static ClientNamespacedWriteModel updateMany(MongoNamespace namespace, Document query, Object update,
38+
UpdateOptions updateOptions) {
39+
40+
ClientUpdateManyOptions updateManyOptions = ClientUpdateManyOptions.clientUpdateManyOptions();
41+
updateManyOptions.arrayFilters(updateOptions.getArrayFilters());
42+
updateManyOptions.collation(updateOptions.getCollation());
43+
updateManyOptions.upsert(updateOptions.isUpsert());
44+
updateManyOptions.hint(updateOptions.getHint());
45+
updateManyOptions.hintString(updateOptions.getHintString());
46+
47+
if (update instanceof List<?> pipeline) {
48+
return ClientNamespacedWriteModel.updateMany(namespace, query, (List<Document>) pipeline, updateManyOptions);
49+
} else {
50+
return ClientNamespacedWriteModel.updateMany(namespace, query, (Document) update, updateManyOptions);
51+
}
52+
}
53+
54+
static ClientNamespacedWriteModel updateOne(MongoNamespace namespace, Document query, Object update,
55+
UpdateOptions updateOptions) {
56+
57+
ClientUpdateOneOptions updateOneOptions = ClientUpdateOneOptions.clientUpdateOneOptions();
58+
updateOneOptions.sort(updateOptions.getSort());
59+
updateOneOptions.arrayFilters(updateOptions.getArrayFilters());
60+
updateOneOptions.collation(updateOptions.getCollation());
61+
updateOneOptions.upsert(updateOptions.isUpsert());
62+
updateOneOptions.hint(updateOptions.getHint());
63+
updateOneOptions.hintString(updateOptions.getHintString());
64+
65+
if (update instanceof List<?> pipeline) {
66+
return ClientNamespacedWriteModel.updateOne(namespace, query, (List<Document>) pipeline, updateOneOptions);
67+
} else {
68+
return ClientNamespacedWriteModel.updateOne(namespace, query, (Document) update, updateOneOptions);
69+
}
70+
}
71+
72+
static ClientNamespacedWriteModel removeMany(MongoNamespace namespace, Document query, DeleteOptions deleteOptions) {
73+
74+
ClientDeleteManyOptions clientDeleteManyOptions = ClientDeleteManyOptions.clientDeleteManyOptions();
75+
clientDeleteManyOptions.collation(deleteOptions.getCollation());
76+
clientDeleteManyOptions.hint(deleteOptions.getHint());
77+
clientDeleteManyOptions.hintString(deleteOptions.getHintString());
78+
79+
return ClientNamespacedWriteModel.deleteMany(namespace, query, clientDeleteManyOptions);
80+
}
81+
82+
static ClientNamespacedWriteModel removeOne(MongoNamespace namespace, Document query, DeleteOptions deleteOptions) {
83+
84+
ClientDeleteOneOptions clientDeleteOneOptions = ClientDeleteOneOptions.clientDeleteOneOptions();
85+
// TODO: open an issue with MongoDB to enable sort for deleteOne
86+
clientDeleteOneOptions.collation(deleteOptions.getCollation());
87+
clientDeleteOneOptions.hint(deleteOptions.getHint());
88+
clientDeleteOneOptions.hintString(deleteOptions.getHintString());
89+
90+
91+
return ClientNamespacedWriteModel.deleteOne(namespace, query, clientDeleteOneOptions);
92+
}
93+
94+
static ClientNamespacedWriteModel replaceOne(MongoNamespace namespace, Document query, Document replacement, UpdateOptions updateOptions) {
95+
96+
ClientReplaceOneOptions replaceOptions = ClientReplaceOneOptions.clientReplaceOneOptions();
97+
replaceOptions.sort(updateOptions.getSort());
98+
replaceOptions.upsert(updateOptions.isUpsert());
99+
replaceOptions.hint(updateOptions.getHint());
100+
replaceOptions.hintString(updateOptions.getHintString());
101+
replaceOptions.collation(updateOptions.getCollation());
102+
103+
return ClientNamespacedWriteModel.replaceOne(namespace, query,
104+
replacement, replaceOptions);
105+
}
106+
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/BulkWriter.java

Lines changed: 14 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@
2828
import org.springframework.data.mongodb.core.bulk.BulkOperation;
2929
import org.springframework.data.mongodb.core.bulk.BulkOperation.Insert;
3030
import org.springframework.data.mongodb.core.bulk.BulkOperation.Remove;
31+
import org.springframework.data.mongodb.core.bulk.BulkOperation.RemoveFirst;
3132
import org.springframework.data.mongodb.core.bulk.BulkOperation.Replace;
3233
import org.springframework.data.mongodb.core.bulk.BulkOperation.Update;
33-
import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateSingle;
34+
import org.springframework.data.mongodb.core.bulk.BulkOperation.UpdateFirst;
3435
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
3536
import org.springframework.data.mongodb.core.mapping.event.AfterSaveEvent;
3637

@@ -40,11 +41,7 @@
4041
import com.mongodb.client.model.UpdateOptions;
4142
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
4243
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
43-
import com.mongodb.client.model.bulk.ClientDeleteManyOptions;
4444
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
45-
import com.mongodb.client.model.bulk.ClientReplaceOneOptions;
46-
import com.mongodb.client.model.bulk.ClientUpdateManyOptions;
47-
import com.mongodb.client.model.bulk.ClientUpdateOneOptions;
4845

4946
/**
5047
* Internal API wrapping a {@link MongoTemplate} to encapsulate {@link Bulk} handling.
@@ -81,7 +78,7 @@ public ClientBulkWriteResult write(String defaultDatabase, Order order, Bulk bul
8178
} else if (bulkOp instanceof Update update) {
8279

8380
Class<?> domainType = update.context().namespace().type();
84-
boolean multi = !(bulkOp instanceof UpdateSingle);
81+
boolean multi = !(bulkOp instanceof UpdateFirst);
8582

8683
UpdateContext updateContext = template.getQueryOperations().updateContext(update.update(), update.query(),
8784
update.upsert());
@@ -93,37 +90,9 @@ public ClientBulkWriteResult write(String defaultDatabase, Order order, Bulk bul
9390
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, update.query());
9491

9592
if (multi) {
96-
97-
ClientUpdateManyOptions updateManyOptions = ClientUpdateManyOptions.clientUpdateManyOptions();
98-
updateManyOptions.arrayFilters(updateOptions.getArrayFilters());
99-
updateManyOptions.collation(updateOptions.getCollation());
100-
updateManyOptions.upsert(updateOptions.isUpsert());
101-
updateManyOptions.hint(updateOptions.getHint());
102-
updateManyOptions.hintString(updateOptions.getHintString());
103-
104-
if (mappedUpdate instanceof List<?> pipeline) {
105-
writeModels.add(ClientNamespacedWriteModel.updateMany(mongoNamespace, mappedQuery,
106-
(List<Document>) pipeline, updateManyOptions));
107-
} else {
108-
writeModels.add(ClientNamespacedWriteModel.updateMany(mongoNamespace, mappedQuery, (Document) mappedUpdate,
109-
updateManyOptions));
110-
}
93+
writeModels.add(BulkWriteSupport.updateMany(mongoNamespace, mappedQuery, mappedUpdate, updateOptions));
11194
} else {
112-
113-
ClientUpdateOneOptions updateOneOptions = ClientUpdateOneOptions.clientUpdateOneOptions();
114-
updateOneOptions.arrayFilters(updateOptions.getArrayFilters());
115-
updateOneOptions.collation(updateOptions.getCollation());
116-
updateOneOptions.upsert(updateOptions.isUpsert());
117-
updateOneOptions.hint(updateOptions.getHint());
118-
updateOneOptions.hintString(updateOptions.getHintString());
119-
120-
if (mappedUpdate instanceof List<?> pipeline) {
121-
writeModels.add(ClientNamespacedWriteModel.updateOne(mongoNamespace, mappedQuery, (List<Document>) pipeline,
122-
updateOneOptions));
123-
} else {
124-
writeModels.add(ClientNamespacedWriteModel.updateOne(mongoNamespace, mappedQuery, (Document) mappedUpdate,
125-
updateOneOptions));
126-
}
95+
writeModels.add(BulkWriteSupport.updateOne(mongoNamespace, mappedQuery, mappedUpdate, updateOptions));
12796
}
12897
} else if (bulkOp instanceof Remove remove) {
12998

@@ -132,12 +101,12 @@ public ClientBulkWriteResult write(String defaultDatabase, Order order, Bulk bul
132101

133102
Document mappedQuery = deleteContext.getMappedQuery(template.getPersistentEntity(domainType));
134103
DeleteOptions deleteOptions = deleteContext.getDeleteOptions(domainType);
135-
ClientDeleteManyOptions clientDeleteManyOptions = ClientDeleteManyOptions.clientDeleteManyOptions();
136-
clientDeleteManyOptions.collation(deleteOptions.getCollation());
137-
clientDeleteManyOptions.hint(deleteOptions.getHint());
138-
clientDeleteManyOptions.hintString(deleteOptions.getHintString());
139104

140-
writeModels.add(ClientNamespacedWriteModel.deleteMany(mongoNamespace, mappedQuery, clientDeleteManyOptions));
105+
if (remove instanceof RemoveFirst) {
106+
writeModels.add(BulkWriteSupport.removeOne(mongoNamespace, mappedQuery, deleteOptions));
107+
} else {
108+
writeModels.add(BulkWriteSupport.removeMany(mongoNamespace, mappedQuery, deleteOptions));
109+
}
141110
} else if (bulkOp instanceof Replace replace) {
142111

143112
Class<?> domainType = replace.context().namespace().type();
@@ -151,15 +120,8 @@ public ClientBulkWriteResult write(String defaultDatabase, Order order, Bulk bul
151120
Document mappedQuery = updateContext.getMappedQuery(template.getPersistentEntity(domainType));
152121
UpdateOptions updateOptions = updateContext.getUpdateOptions(domainType, replace.query());
153122

154-
ClientReplaceOneOptions replaceOptions = ClientReplaceOneOptions.clientReplaceOneOptions();
155-
replaceOptions.upsert(updateOptions.isUpsert());
156-
replaceOptions.sort(updateOptions.getSort());
157-
replaceOptions.hint(updateOptions.getHint());
158-
replaceOptions.hintString(updateOptions.getHintString());
159-
replaceOptions.collation(updateOptions.getCollation());
160-
161-
writeModels.add(ClientNamespacedWriteModel.replaceOne(mongoNamespace, mappedQuery,
162-
sourceAwareDocument.document(), replaceOptions));
123+
writeModels.add(
124+
BulkWriteSupport.replaceOne(mongoNamespace, mappedQuery, sourceAwareDocument.document(), updateOptions));
163125
afterSaveCallables.add(sourceAwareDocument);
164126
}
165127
}
@@ -170,7 +132,8 @@ public ClientBulkWriteResult write(String defaultDatabase, Order order, Bulk bul
170132
ClientBulkWriteOptions.clientBulkWriteOptions().ordered(order.equals(Order.SEQUENTIAL))));
171133

172134
afterSaveCallables.forEach(callable -> {
173-
template.maybeEmitEvent(new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName()));
135+
template
136+
.maybeEmitEvent(new AfterSaveEvent<>(callable.source(), callable.document(), callable.collectionName()));
174137
template.maybeCallAfterSave(callable.source(), callable.document(), callable.collectionName());
175138
});
176139
return clientBulkWriteResult;

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoDatabaseFactorySupport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.springframework.aop.framework.ProxyFactory;
2121
import org.springframework.dao.DataAccessException;
2222
import org.springframework.dao.support.PersistenceExceptionTranslator;
23-
import org.springframework.data.mongodb.MongoClientAware;
23+
import org.springframework.data.mongodb.MongoClusterCapable;
2424
import org.springframework.data.mongodb.MongoDatabaseFactory;
2525
import org.springframework.data.mongodb.SessionAwareMethodInterceptor;
2626
import org.springframework.lang.Contract;
@@ -42,7 +42,7 @@
4242
* @since 3.0
4343
* @see SimpleMongoClientDatabaseFactory
4444
*/
45-
public abstract class MongoDatabaseFactorySupport<C> implements MongoDatabaseFactory, MongoClientAware<C> {
45+
public abstract class MongoDatabaseFactorySupport<C> implements MongoDatabaseFactory, MongoClusterCapable<C> {
4646

4747
private final C mongoClient;
4848
private final String databaseName;
@@ -147,7 +147,7 @@ public MongoDatabaseFactory withSession(ClientSession session) {
147147
/**
148148
* @return the Mongo client object.
149149
*/
150-
public C getMongoClient() {
150+
public C getMongoCluster() {
151151
return mongoClient;
152152
}
153153

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.stream.Collectors;
3838
import java.util.stream.Stream;
3939

40+
import com.mongodb.client.MongoCluster;
4041
import org.apache.commons.logging.Log;
4142
import org.apache.commons.logging.LogFactory;
4243
import org.bson.Document;
@@ -65,7 +66,7 @@
6566
import org.springframework.data.mapping.MappingException;
6667
import org.springframework.data.mapping.callback.EntityCallbacks;
6768
import org.springframework.data.mapping.context.MappingContext;
68-
import org.springframework.data.mongodb.MongoClientAware;
69+
import org.springframework.data.mongodb.MongoClusterCapable;
6970
import org.springframework.data.mongodb.MongoDatabaseFactory;
7071
import org.springframework.data.mongodb.MongoDatabaseUtils;
7172
import org.springframework.data.mongodb.SessionSynchronization;
@@ -671,15 +672,15 @@ protected void executeQuery(Query query, String collectionName, DocumentCallback
671672
}
672673
}
673674

674-
<T> @Nullable T doWithClient(Function<MongoClient, T> callback) {
675+
<T> @Nullable T doWithClient(Function<MongoCluster, T> callback) {
675676

676-
if (!(getMongoDatabaseFactory() instanceof MongoClientAware<?> client)) {
677+
if (!(getMongoDatabaseFactory() instanceof MongoClusterCapable<?> client)) {
677678
throw new IllegalStateException(
678679
"Unable to obtain MongoClient. Does your database factory implement MongoClientAware?");
679680
}
680681

681682
try {
682-
return callback.apply((MongoClient) client.getMongoClient());
683+
return callback.apply((MongoCluster) client.getMongoCluster());
683684
} catch (RuntimeException e) {
684685
throw potentiallyConvertRuntimeException(e, exceptionTranslator);
685686
}

0 commit comments

Comments
 (0)