From 9a4cdc551859c3897254cef60ac380d82798e3cf Mon Sep 17 00:00:00 2001 From: yuzelin Date: Fri, 7 Feb 2025 15:00:38 +0800 Subject: [PATCH 1/2] [core] Check that all fields with aggregate functions in partial-update should be protected by sequence-group --- .../compact/PartialUpdateMergeFunction.java | 5 ++- .../PartialUpdateMergeFunctionTest.java | 37 ++++++++++++++++--- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index a28ac52df41f..b6881f230e9f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -549,8 +549,9 @@ private Map> createFieldAggregators( String aggFuncName = getAggFuncName(options, fieldName); if (aggFuncName != null) { checkArgument( - !fieldSeqComparators.isEmpty(), - "Must use sequence group for aggregation functions."); + fieldSeqComparators.containsKey(fieldNames.indexOf(fieldName)), + "Must use sequence group for aggregation functions but not found for field %s.", + fieldName); fieldAggregators.put( i, () -> diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java index 529110cabcf8..28625a9bf330 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.data.GenericRow; import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -31,6 +32,7 @@ import org.junit.jupiter.api.Test; import static org.apache.paimon.CoreOptions.FIELDS_DEFAULT_AGG_FUNC; +import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -820,14 +822,39 @@ public void testMultiSequenceFieldsPartialUpdateWithAggregationProjectPushDown() @Test public void testAggregationWithoutSequenceGroup() { - Options options = new Options(); - options.set("fields.f1.aggregate-function", "listagg"); - RowType rowType = RowType.of(DataTypes.INT(), DataTypes.INT()); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT() + }, + new String[] {"pk", "f0", "g0", "f1", "g1"}); + + Options options1 = new Options(); + options1.set("fields.f0.aggregate-function", "listagg"); + options1.set("fields.f1.aggregate-function", "listagg"); assertThatThrownBy( () -> PartialUpdateMergeFunction.factory( - options, rowType, ImmutableList.of("f0"))) - .hasMessageContaining("Must use sequence group for aggregation functions"); + options1, rowType, ImmutableList.of("pk"))) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Must use sequence group for aggregation functions but not found for field f0.")); + + Options options2 = new Options(options1.toMap()); + options2.set("fields.g0.sequence-group", "f0"); + assertThatThrownBy( + () -> + PartialUpdateMergeFunction.factory( + options2, rowType, ImmutableList.of("pk"))) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Must use sequence group for aggregation functions but not found for field f1.")); } private void add(MergeFunction function, Integer... f) { From 308231d631e8a9fceb39dbe95e82e98e1878d0b3 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Fri, 7 Feb 2025 17:46:07 +0800 Subject: [PATCH 2/2] fix --- .../mergetree/compact/PartialUpdateMergeFunction.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index b6881f230e9f..3ce51127b151 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; @@ -548,8 +549,11 @@ private Map> createFieldAggregators( String aggFuncName = getAggFuncName(options, fieldName); if (aggFuncName != null) { + // last_non_null_value doesn't require sequence group checkArgument( - fieldSeqComparators.containsKey(fieldNames.indexOf(fieldName)), + aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME) + || fieldSeqComparators.containsKey( + fieldNames.indexOf(fieldName)), "Must use sequence group for aggregation functions but not found for field %s.", fieldName); fieldAggregators.put(