Skip to content

Commit 263995f

Browse files
David Turanskiartembilan
authored andcommitted
INT-4568: Add ReactiveMongoDBMessageSource
JIRA: https://jira.spring.io/browse/INT-4568 Clean up per review and some updates to `mongodb.adoc`
1 parent 4b05168 commit 263995f

File tree

4 files changed

+509
-15
lines changed

4 files changed

+509
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mongodb.inbound;
18+
19+
import org.reactivestreams.Publisher;
20+
21+
import org.springframework.beans.BeansException;
22+
import org.springframework.context.ApplicationContext;
23+
import org.springframework.context.ApplicationContextAware;
24+
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
25+
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
26+
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
27+
import org.springframework.data.mongodb.core.convert.MongoConverter;
28+
import org.springframework.data.mongodb.core.query.BasicQuery;
29+
import org.springframework.data.mongodb.core.query.Query;
30+
import org.springframework.expression.Expression;
31+
import org.springframework.expression.TypeLocator;
32+
import org.springframework.expression.common.LiteralExpression;
33+
import org.springframework.expression.spel.support.StandardEvaluationContext;
34+
import org.springframework.expression.spel.support.StandardTypeLocator;
35+
import org.springframework.integration.endpoint.AbstractMessageSource;
36+
import org.springframework.integration.expression.ExpressionUtils;
37+
import org.springframework.integration.mongodb.support.MongoHeaders;
38+
import org.springframework.util.Assert;
39+
40+
import com.mongodb.DBObject;
41+
import reactor.core.publisher.Flux;
42+
import reactor.core.publisher.Mono;
43+
44+
/**
45+
* An instance of {@link org.springframework.integration.core.MessageSource} which returns
46+
* a {@link org.springframework.messaging.Message} with a payload which is the result of
47+
* execution of a {@link Query}. When {@code expectSingleResult} is false (default), the MongoDb
48+
* {@link Query} is executed using {@link ReactiveMongoOperations#find(Query, Class)} method which
49+
* returns a {@link Flux}. The returned {@link Flux} will be used as the payload of the
50+
* {@link org.springframework.messaging.Message} returned by the {@link #receive()}
51+
* method.
52+
* <p>
53+
* When {@code expectSingleResult} is true, the {@link ReactiveMongoOperations#findOne(Query, Class)} is
54+
* used instead, and the message payload will be a {@link Mono} for the single object returned from the
55+
* query.
56+
*
57+
* @author David Turanski
58+
*
59+
* @since 5.3
60+
*/
61+
public class ReactiveMongoDbMessageSource extends AbstractMessageSource<Publisher<?>>
62+
implements ApplicationContextAware {
63+
64+
private final Expression queryExpression;
65+
66+
private Expression collectionNameExpression = new LiteralExpression("data");
67+
68+
private StandardEvaluationContext evaluationContext;
69+
70+
private ReactiveMongoOperations reactiveMongoTemplate;
71+
72+
private MongoConverter mongoConverter;
73+
74+
private ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory;
75+
76+
private Class<?> entityClass = DBObject.class;
77+
78+
private boolean expectSingleResult = false;
79+
80+
private ApplicationContext applicationContext;
81+
82+
private volatile boolean initialized = false;
83+
84+
/**
85+
* Create an instance with the provided {@link ReactiveMongoDatabaseFactory} and SpEL expression
86+
* which should resolve to a MongoDb 'query' string (see https://www.mongodb.org/display/DOCS/Querying).
87+
* The 'queryExpression' will be evaluated on every call to the {@link #receive()} method.
88+
* @param reactiveMongoDatabaseFactory The reactiveMongoDatabaseFactory factory.
89+
* @param queryExpression The query expression.
90+
*/
91+
public ReactiveMongoDbMessageSource(ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory,
92+
Expression queryExpression) {
93+
94+
Assert.notNull(reactiveMongoDatabaseFactory, "'reactiveMongoDatabaseFactory' must not be null");
95+
Assert.notNull(queryExpression, "'queryExpression' must not be null");
96+
97+
this.reactiveMongoDatabaseFactory = reactiveMongoDatabaseFactory;
98+
this.queryExpression = queryExpression;
99+
}
100+
101+
/**
102+
* Create an instance with the provided {@link ReactiveMongoOperations} and SpEL expression
103+
* which should resolve to a Mongo 'query' string (see https://www.mongodb.org/display/DOCS/Querying).
104+
* It assumes that the {@link ReactiveMongoOperations} is fully initialized and ready to be used.
105+
* The 'queryExpression' will be evaluated on every call to the {@link #receive()} method.
106+
* @param reactiveMongoTemplate The reactive Mongo template.
107+
* @param queryExpression The query expression.
108+
*/
109+
public ReactiveMongoDbMessageSource(ReactiveMongoOperations reactiveMongoTemplate, Expression queryExpression) {
110+
Assert.notNull(reactiveMongoTemplate, "'reactiveMongoTemplate' must not be null");
111+
Assert.notNull(queryExpression, "'queryExpression' must not be null");
112+
this.reactiveMongoTemplate = reactiveMongoTemplate;
113+
this.queryExpression = queryExpression;
114+
}
115+
116+
/**
117+
* Allow you to set the type of the entityClass that will be passed to the
118+
* {@link ReactiveMongoTemplate#find(Query, Class)} or {@link ReactiveMongoTemplate#findOne(Query, Class)}
119+
* method.
120+
* Default is {@link DBObject}.
121+
* @param entityClass The entity class.
122+
*/
123+
public void setEntityClass(Class<?> entityClass) {
124+
Assert.notNull(entityClass, "'entityClass' must not be null");
125+
this.entityClass = entityClass;
126+
}
127+
128+
/**
129+
* Allow you to manage which find* method to invoke on {@link ReactiveMongoTemplate}.
130+
* Default is 'false', which means the {@link #receive()} method will use
131+
* the {@link ReactiveMongoTemplate#find(Query, Class)} method. If set to 'true',
132+
* {@link #receive()} will use {@link ReactiveMongoTemplate#findOne(Query, Class)},
133+
* and the payload of the returned {@link org.springframework.messaging.Message}
134+
* will be the returned target Object of type
135+
* identified by {@link #entityClass} instead of a List.
136+
* @param expectSingleResult true if a single result is expected.
137+
*/
138+
public void setExpectSingleResult(boolean expectSingleResult) {
139+
this.expectSingleResult = expectSingleResult;
140+
}
141+
142+
/**
143+
* Set the SpEL {@link Expression} that should resolve to a collection name
144+
* used by the {@link Query}. The resulting collection name will be included
145+
* in the {@link MongoHeaders#COLLECTION_NAME} header.
146+
* @param collectionNameExpression The collection name expression.
147+
*/
148+
public void setCollectionNameExpression(Expression collectionNameExpression) {
149+
Assert.notNull(collectionNameExpression, "'collectionNameExpression' must not be null");
150+
this.collectionNameExpression = collectionNameExpression;
151+
}
152+
153+
/**
154+
* Allow you to provide a custom {@link MongoConverter} used to assist in deserialization
155+
* data read from MongoDb. Only allowed if this instance was constructed with a
156+
* {@link ReactiveMongoDatabaseFactory}.
157+
* @param mongoConverter The mongo converter.
158+
*/
159+
public void setMongoConverter(MongoConverter mongoConverter) {
160+
Assert.isNull(this.reactiveMongoTemplate,
161+
"'mongoConverter' can not be set when instance was constructed with ReactiveMongoTemplate");
162+
this.mongoConverter = mongoConverter;
163+
}
164+
165+
@Override
166+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
167+
this.applicationContext = applicationContext;
168+
}
169+
170+
@Override
171+
public String getComponentType() {
172+
return "mongo:reactive-inbound-channel-adapter";
173+
}
174+
175+
@Override
176+
protected void onInit() {
177+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
178+
TypeLocator typeLocator = this.evaluationContext.getTypeLocator();
179+
if (typeLocator instanceof StandardTypeLocator) {
180+
//Register MongoDB query API package so FQCN can be avoided in query-expression.
181+
((StandardTypeLocator) typeLocator).registerImport("org.springframework.data.mongodb.core.query");
182+
}
183+
if (this.reactiveMongoTemplate == null) {
184+
ReactiveMongoTemplate template =
185+
new ReactiveMongoTemplate(this.reactiveMongoDatabaseFactory, this.mongoConverter);
186+
if (this.applicationContext != null) {
187+
template.setApplicationContext(this.applicationContext);
188+
}
189+
this.reactiveMongoTemplate = template;
190+
}
191+
this.initialized = true;
192+
}
193+
194+
/**
195+
* Execute a {@link Query} returning its results as the Message payload.
196+
* The payload can be either {@link Flux} or {@link Mono} of objects of type
197+
* identified by {@link #entityClass}, or a single element of type identified by {@link #entityClass}
198+
* based on the value of {@link #expectSingleResult} attribute which defaults to 'false' resulting
199+
* {@link org.springframework.messaging.Message} with payload of type
200+
* {@link Flux}. The collection name used in the
201+
* query will be provided in the {@link MongoHeaders#COLLECTION_NAME} header.
202+
*/
203+
@Override
204+
public Object doReceive() {
205+
Assert.isTrue(this.initialized, "This class is not yet initialized. Invoke its afterPropertiesSet() method");
206+
Object value = this.queryExpression.getValue(this.evaluationContext);
207+
Assert.notNull(value, "'queryExpression' must not evaluate to null");
208+
Query query = null;
209+
if (value instanceof String) {
210+
query = new BasicQuery((String) value);
211+
}
212+
else if (value instanceof Query) {
213+
query = ((Query) value);
214+
}
215+
else {
216+
throw new IllegalStateException("'queryExpression' must evaluate to String " +
217+
"or org.springframework.data.mongodb.core.query.Query, but not: " + query);
218+
}
219+
220+
String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, String.class);
221+
Assert.notNull(collectionName, "'collectionNameExpression' must not evaluate to null");
222+
223+
Object result;
224+
if (this.expectSingleResult) {
225+
result = this.reactiveMongoTemplate.findOne(query, this.entityClass, collectionName);
226+
}
227+
else {
228+
result = this.reactiveMongoTemplate.find(query, this.entityClass, collectionName);
229+
}
230+
return getMessageBuilderFactory()
231+
.withPayload(result)
232+
.setHeader(MongoHeaders.COLLECTION_NAME, collectionName);
233+
}
234+
235+
}

0 commit comments

Comments
 (0)