diff --git a/commons-datastore/commons-datastore-mongodb/src/main/java/org/opencb/commons/datastore/mongodb/MongoDBCollection.java b/commons-datastore/commons-datastore-mongodb/src/main/java/org/opencb/commons/datastore/mongodb/MongoDBCollection.java index 645cc98ef..e6792c6d0 100644 --- a/commons-datastore/commons-datastore-mongodb/src/main/java/org/opencb/commons/datastore/mongodb/MongoDBCollection.java +++ b/commons-datastore/commons-datastore-mongodb/src/main/java/org/opencb/commons/datastore/mongodb/MongoDBCollection.java @@ -444,6 +444,123 @@ public DataResult update(ClientSession clientSession, List queri start); } + /** + * Update documents using an aggregation pipeline. + * + * @param query the filter to select the documents to update + * @param pipeline the aggregation pipeline to apply for the update + * @param options additional options for the query and update operation + * @return a DataResult containing the result of the update operation + */ + public DataResult updateWithPipeline(Bson query, List pipeline, QueryOptions options) { + return updateWithPipeline(null, query, pipeline, options); + } + + /** + * Update documents using an aggregation pipeline. + * + * @param clientSession the client session to use for the operation, or null if not using sessions + * @param query the filter to select the documents to update + * @param pipeline the aggregation pipeline to apply for the update + * @param options additional options for the query and update operation + * @return a DataResult containing the result of the update operation + */ + public DataResult updateWithPipeline(ClientSession clientSession, Bson query, List pipeline, QueryOptions options) { + long start = startQuery(); + + boolean upsert = false; + boolean multi = false; + if (options != null) { + upsert = options.getBoolean(UPSERT); + multi = options.getBoolean(MULTI); + } + + UpdateResult updateResult = mongoDBNativeQuery.updateWithPipeline(clientSession, query, pipeline, upsert, multi); + return endWrite(updateResult.getMatchedCount(), updateResult.getUpsertedId() != null ? 1 : 0, + updateResult.getUpsertedId() == null ? updateResult.getModifiedCount() : 0, 0, 0, start); + } + + /** + * Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document. + * + * @param query the filter to select the document to update + * @param projection the fields to return in the resulting document + * @param sort the sort criteria to apply before finding the document + * @param pipeline the aggregation pipeline to apply for the update + * @param options additional options for the query and update operation + * @return a DataResult containing the updated document, or an empty result if no document matched + */ + public DataResult findAndUpdateWithPipeline(Bson query, Bson projection, Bson sort, + List pipeline, QueryOptions options) { + return privateFindAndUpdateWithPipeline(null, query, projection, sort, pipeline, options, null, null); + } + + /** + * Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document. + * + * @param clientSession the client session to use for the operation, or null if not using sessions + * @param query the filter to select the document to update + * @param projection the fields to return in the resulting document + * @param sort the sort criteria to apply before finding the document + * @param pipeline the aggregation pipeline to apply for the update + * @param options additional options for the query and update operation + * @return a DataResult containing the updated document, or an empty result if no document matched + */ + public DataResult findAndUpdateWithPipeline(ClientSession clientSession, Bson query, Bson projection, Bson sort, + List pipeline, QueryOptions options) { + return privateFindAndUpdateWithPipeline(clientSession, query, projection, sort, pipeline, options, null, null); + } + + /** + * Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document. + * + * @param query the filter to select the document to update + * @param projection the fields to return in the resulting document + * @param sort the sort criteria to apply before finding the document + * @param pipeline the aggregation pipeline to apply for the update + * @param clazz the class type to convert the result to; if null or Document.class, returns a Document + * @param options additional options for the query and update operation + * @param the type of the returned result + * @return a DataResult containing the updated document, or an empty result if no document matched + */ + public DataResult findAndUpdateWithPipeline(Bson query, Bson projection, Bson sort, + List pipeline, Class clazz, QueryOptions options) { + return privateFindAndUpdateWithPipeline(null, query, projection, sort, pipeline, options, clazz, null); + } + + /** + * Finds a document matching the given query, applies an aggregation pipeline to update it, and returns the updated document. + * + * @param clientSession the client session to use for the operation, or null if not using sessions + * @param query the filter to select the document to update + * @param projection the fields to return in the resulting document + * @param sort the sort criteria to apply before finding the document + * @param pipeline the aggregation pipeline to apply for the update + * @param clazz the class type to convert the result to; if null or Document.class, returns a Document + * @param options additional options for the query and update operation + * @param the type of the returned result + * @return a DataResult containing the updated document, or an empty result if no document matched + */ + public DataResult findAndUpdateWithPipeline(ClientSession clientSession, Bson query, Bson projection, Bson sort, + List pipeline, Class clazz, QueryOptions options) { + return privateFindAndUpdateWithPipeline(clientSession, query, projection, sort, pipeline, options, clazz, null); + } + + private DataResult privateFindAndUpdateWithPipeline(ClientSession clientSession, Bson query, Bson projection, Bson sort, + List pipeline, QueryOptions options, Class clazz, + ComplexTypeConverter converter) { + long start = startQuery(); + Document result = mongoDBNativeQuery.findAndUpdateWithPipeline(clientSession, query, projection, sort, pipeline, options); + if (clazz != null && !clazz.equals(Document.class)) { + try { + return endQuery(Collections.singletonList(objectMapper.readValue(objectWriter.writeValueAsString(result), clazz)), start); + } catch (IOException e) { + logger.error("Error deserializing result: " + e.getMessage(), e); + } + } + return endQuery(Collections.singletonList(result), start); + } + public DataResult remove(Bson query, QueryOptions options) { return remove(null, query, options); } diff --git a/commons-datastore/commons-datastore-mongodb/src/main/java/org/opencb/commons/datastore/mongodb/MongoDBNativeQuery.java b/commons-datastore/commons-datastore-mongodb/src/main/java/org/opencb/commons/datastore/mongodb/MongoDBNativeQuery.java index 0876bacdd..c7b33b62d 100644 --- a/commons-datastore/commons-datastore-mongodb/src/main/java/org/opencb/commons/datastore/mongodb/MongoDBNativeQuery.java +++ b/commons-datastore/commons-datastore-mongodb/src/main/java/org/opencb/commons/datastore/mongodb/MongoDBNativeQuery.java @@ -453,6 +453,99 @@ public BulkWriteResult update(ClientSession clientSession, List } } + /** + * Update documents using an aggregation pipeline. + * + * @param query The filter to select the documents to update. + * @param pipeline The aggregation pipeline specifying the update operations. + * @param upsert Whether to insert a new document if no documents match the query. + * @param multi Whether to update multiple documents or just one. + * @return The result of the update operation. + */ + public UpdateResult updateWithPipeline(Bson query, List pipeline, boolean upsert, boolean multi) { + return updateWithPipeline(null, query, pipeline, upsert, multi); + } + + /** + * Update documents using an aggregation pipeline. + * + * @param clientSession Session in which the operation will be performed. Can be null. + * @param query The filter to select the documents to update. + * @param pipeline The aggregation pipeline specifying the update operations. + * @param upsert Whether to insert a new document if no documents match the query. + * @param multi Whether to update multiple documents or just one. + * @return The result of the update operation. + */ + public UpdateResult updateWithPipeline(ClientSession clientSession, Bson query, List pipeline, + boolean upsert, boolean multi) { + UpdateOptions updateOptions = new UpdateOptions().upsert(upsert); + if (multi) { + if (clientSession != null) { + return dbCollection.updateMany(clientSession, query, pipeline, updateOptions); + } else { + return dbCollection.updateMany(query, pipeline, updateOptions); + } + } else { + if (clientSession != null) { + return dbCollection.updateOne(clientSession, query, pipeline, updateOptions); + } else { + return dbCollection.updateOne(query, pipeline, updateOptions); + } + } + } + + /** + * Finds and updates a single document using an aggregation pipeline. + * + * @param query The filter to select the document to update. + * @param projection The fields to return in the resulting document. + * @param sort The sort criteria to apply before updating. + * @param pipeline The aggregation pipeline specifying the update operations. + * @param options Additional options such as upsert and returnNew. + * @return The updated document, or null if no document matched the query. + */ + public Document findAndUpdateWithPipeline(Bson query, Bson projection, Bson sort, + List pipeline, QueryOptions options) { + return findAndUpdateWithPipeline(null, query, projection, sort, pipeline, options); + } + + /** + * Finds and updates a single document using an aggregation pipeline. + * + * @param clientSession Session in which the operation will be performed. Can be null. + * @param query The filter to select the document to update. + * @param projection The fields to return in the resulting document. + * @param sort The sort criteria to apply before updating. + * @param pipeline The aggregation pipeline specifying the update operations. + * @param options Additional options such as upsert and returnNew. + * @return The updated document, or null if no document matched the query. + */ + public Document findAndUpdateWithPipeline(ClientSession clientSession, Bson query, Bson projection, Bson sort, + List pipeline, QueryOptions options) { + boolean upsert = false; + boolean returnNew = false; + + if (options != null) { + if (projection == null) { + projection = getProjection(projection, options); + } + upsert = options.getBoolean("upsert", false); + returnNew = options.getBoolean("returnNew", false); + } + + FindOneAndUpdateOptions findOneAndUpdateOptions = new FindOneAndUpdateOptions() + .sort(sort) + .projection(projection) + .upsert(upsert) + .returnDocument(returnNew ? ReturnDocument.AFTER : ReturnDocument.BEFORE); + + if (clientSession != null) { + return dbCollection.findOneAndUpdate(clientSession, query, pipeline, findOneAndUpdateOptions); + } else { + return dbCollection.findOneAndUpdate(query, pipeline, findOneAndUpdateOptions); + } + } + private IndexOutOfBoundsException wrongQueryUpdateSize(List queries, List updates) { return new IndexOutOfBoundsException("QueryList.size=" + queries.size() + " and UpdatesList.size=" + updates.size() + " must be the same size."); diff --git a/commons-datastore/commons-datastore-mongodb/src/test/java/org/opencb/commons/datastore/mongodb/MongoDBCollectionTest.java b/commons-datastore/commons-datastore-mongodb/src/test/java/org/opencb/commons/datastore/mongodb/MongoDBCollectionTest.java index da5c10398..21bade4aa 100644 --- a/commons-datastore/commons-datastore-mongodb/src/test/java/org/opencb/commons/datastore/mongodb/MongoDBCollectionTest.java +++ b/commons-datastore/commons-datastore-mongodb/src/test/java/org/opencb/commons/datastore/mongodb/MongoDBCollectionTest.java @@ -37,6 +37,8 @@ import java.util.*; import static org.junit.Assert.*; +import static org.opencb.commons.datastore.mongodb.MongoDBCollection.MULTI; +import static org.opencb.commons.datastore.mongodb.MongoDBCollection.UPSERT; import static org.opencb.commons.datastore.mongodb.MongoDBQueryUtils.*; import static org.opencb.commons.datastore.mongodb.MongoDBQueryUtils.Accumulator.*; @@ -1541,7 +1543,7 @@ public void testUpdate5_upsert() throws Exception { } DataResult writeResult = mongoDBCollectionUpdateTest.update(queries, updates, new QueryOptions("multi", false) - .append(MongoDBCollection.UPSERT, true)); + .append(UPSERT, true)); assertEquals(modifiedDocuments, writeResult.getNumUpdated()); assertEquals(numUpserts, writeResult.getNumInserted()); } @@ -1632,4 +1634,226 @@ public void close() throws IOException { } } + @Test + public void testUpdateWithPipeline() throws Exception { + // Create a test document + Document testDoc = new Document("id", 999) + .append("name", "TestUser") + .append("age", 25) + .append("score", 100); + mongoDBCollectionUpdateTest.insert(testDoc, null); + + // Create aggregation pipeline to increment age and double the score + List pipeline = Arrays.asList( + new Document("$set", new Document() + .append("age", new Document("$add", Arrays.asList("$age", 1))) + .append("score", new Document("$multiply", Arrays.asList("$score", 2))) + .append("lastModified", new Date()) + ) + ); + + // Test single document update with pipeline + Bson query = new Document("id", 999); + DataResult updateResult = mongoDBCollectionUpdateTest.updateWithPipeline(query, pipeline, null); + + assertEquals("One document should be matched", 1, updateResult.getNumMatches()); + assertEquals("One document should be updated", 1, updateResult.getNumUpdated()); + + // Verify the update + DataResult findResult = mongoDBCollectionUpdateTest.find(query, null); + Document updatedDoc = findResult.first(); + assertEquals("Age should be incremented", 26, updatedDoc.getInteger("age").intValue()); + assertEquals("Score should be doubled", 200, updatedDoc.getInteger("score").intValue()); + assertNotNull("lastModified should be set", updatedDoc.get("lastModified")); + } + + @Test + public void testUpdateWithPipelineUpsert() throws Exception { + // Test upsert with aggregation pipeline + List pipeline = Arrays.asList( + new Document("$set", new Document() + .append("name", "NewUser") + .append("age", 30) + .append("created", new Date()) + .append("isNew", true) + ) + ); + + Bson query = new Document("id", 888); + QueryOptions options = new QueryOptions(UPSERT, true); + + DataResult updateResult = mongoDBCollectionUpdateTest.updateWithPipeline(query, pipeline, options); + + assertEquals("No documents should be matched for new document", 0, updateResult.getNumMatches()); + assertEquals("One document should be inserted", 1, updateResult.getNumInserted()); + + // Verify the upserted document + DataResult findResult = mongoDBCollectionUpdateTest.find(query, null); + Document upsertedDoc = findResult.first(); + assertEquals("Name should be set", "NewUser", upsertedDoc.getString("name")); + assertEquals("Age should be set", 30, upsertedDoc.getInteger("age").intValue()); + assertTrue("isNew flag should be true", upsertedDoc.getBoolean("isNew")); + } + + @Test + public void testUpdateWithPipelineMulti() throws Exception { + // Insert multiple test documents + List testDocs = Arrays.asList( + new Document("category", "test").append("value", 10), + new Document("category", "test").append("value", 20), + new Document("category", "test").append("value", 30), + new Document("category", "other").append("value", 40) + ); + mongoDBCollectionUpdateTest.insert(testDocs, null); + + // Pipeline to add bonus field based on value + List pipeline = Arrays.asList( + new Document("$set", new Document() + .append("bonus", new Document("$cond", Arrays.asList( + new Document("$gte", Arrays.asList("$value", 20)), + new Document("$multiply", Arrays.asList("$value", 0.1)), + 0 + ))) + ) + ); + + // Update all documents in "test" category + Bson query = new Document("category", "test"); + QueryOptions options = new QueryOptions(MULTI, true); + + DataResult updateResult = mongoDBCollectionUpdateTest.updateWithPipeline(query, pipeline, options); + + assertEquals("Three documents should be matched", 3, updateResult.getNumMatches()); + assertEquals("Three documents should be updated", 3, updateResult.getNumUpdated()); + + // Verify updates + DataResult findResult = mongoDBCollectionUpdateTest.find(query, null); + for (Document doc : findResult.getResults()) { + int value = doc.getInteger("value"); + double expectedBonus = value >= 20 ? value * 0.1 : 0; + assertEquals("Bonus should be calculated correctly", expectedBonus, doc.get("bonus", Number.class).doubleValue(), 0.01); + } + } + + @Test + public void testFindAndUpdateWithPipeline() throws Exception { + // Insert test document + Document testDoc = new Document("id", 777) + .append("counter", 5) + .append("name", "Counter"); + mongoDBCollectionUpdateTest.insert(testDoc, null); + + // Pipeline to increment counter and add timestamp + List pipeline = Arrays.asList( + new Document("$set", new Document() + .append("counter", new Document("$add", Arrays.asList("$counter", 1))) + .append("lastIncrement", new Date()) + ) + ); + + Bson query = new Document("id", 777); + QueryOptions options = new QueryOptions("returnNew", true); + + DataResult result = mongoDBCollectionUpdateTest.findAndUpdateWithPipeline( + query, null, null, pipeline, options); + + assertNotNull("Result should not be null", result); + assertEquals("Should return one document", 1, result.getNumResults()); + + Document updatedDoc = result.first(); + assertEquals("Counter should be incremented", 6, updatedDoc.getInteger("counter").intValue()); + assertNotNull("lastIncrement should be set", updatedDoc.get("lastIncrement")); + } + + @Test + public void testFindAndUpdateWithPipelineReturnOriginal() throws Exception { + // Insert test document + Document testDoc = new Document("id", 666) + .append("version", 1) + .append("data", "original"); + mongoDBCollectionUpdateTest.insert(testDoc, null); + + // Pipeline to increment version and update data + List pipeline = Arrays.asList( + new Document("$set", new Document() + .append("version", new Document("$add", Arrays.asList("$version", 1))) + .append("data", "updated") + ) + ); + + Bson query = new Document("id", 666); + QueryOptions options = new QueryOptions("returnNew", false); // Return original document + + DataResult result = mongoDBCollectionUpdateTest.findAndUpdateWithPipeline( + query, null, null, pipeline, options); + + Document returnedDoc = result.first(); + assertEquals("Should return original version", 1, returnedDoc.getInteger("version").intValue()); + assertEquals("Should return original data", "original", returnedDoc.getString("data")); + + // Verify the document was actually updated + Document currentDoc = mongoDBCollectionUpdateTest.find(query, null).first(); + assertEquals("Current version should be updated", 2, currentDoc.getInteger("version").intValue()); + assertEquals("Current data should be updated", "updated", currentDoc.getString("data")); + } + + @Test + public void testUpdateWithPipelineConditionalLogic() throws Exception { + // Insert test documents with different categories + List testDocs = Arrays.asList( + new Document("test", "conditionalLogic").append("id", 1).append("type", "premium").append("balance", 1000), + new Document("test", "conditionalLogic").append("id", 2).append("type", "standard").append("balance", 500), + new Document("test", "conditionalLogic").append("id", 3).append("type", "basic").append("balance", 100) + ); + mongoDBCollectionUpdateTest.insert(testDocs, null); + + // Pipeline with conditional logic for different account types + List pipeline = Arrays.asList( + new Document("$set", new Document() + .append("interestRate", new Document("$switch", new Document() + .append("branches", Arrays.asList( + new Document("case", new Document("$eq", Arrays.asList("$type", "premium"))) + .append("then", 0.05), + new Document("case", new Document("$eq", Arrays.asList("$type", "standard"))) + .append("then", 0.03) + )) + .append("default", 0.01) + )) + .append("interest", new Document("$multiply", Arrays.asList( + "$balance", + new Document("$switch", new Document() + .append("branches", Arrays.asList( + new Document("case", new Document("$eq", Arrays.asList("$type", "premium"))) + .append("then", 0.05), + new Document("case", new Document("$eq", Arrays.asList("$type", "standard"))) + .append("then", 0.03) + )) + .append("default", 0.01) + ) + ))) + ) + ); + + // Update all documents + QueryOptions options = new QueryOptions(MULTI, true); + DataResult updateResult = mongoDBCollectionUpdateTest.updateWithPipeline(Filters.eq("test", "conditionalLogic"), pipeline, options); + + assertEquals("Three documents should be updated", 3, updateResult.getNumUpdated()); + + // Verify interest calculations + DataResult allDocs = mongoDBCollectionUpdateTest.find(Filters.eq("test", "conditionalLogic"), null); + for (Document doc : allDocs.getResults()) { + String type = doc.getString("type"); + int balance = doc.getInteger("balance"); + double expectedRate; + switch (type) { + case "premium": expectedRate = 0.05; break; + case "standard": expectedRate = 0.03; break; + default: expectedRate = 0.01; + } + assertEquals("Interest rate should match type", expectedRate, doc.getDouble("interestRate"), 0.001); + assertEquals("Interest should be calculated correctly", balance * expectedRate, doc.getDouble("interest"), 0.001); + } + } + }