diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index a28ac52df41f..3ce51127b151 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory; import org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggFactory; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; @@ -548,9 +549,13 @@ private Map> createFieldAggregators( String aggFuncName = getAggFuncName(options, fieldName); if (aggFuncName != null) { + // last_non_null_value doesn't require sequence group checkArgument( - !fieldSeqComparators.isEmpty(), - "Must use sequence group for aggregation functions."); + aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME) + || fieldSeqComparators.containsKey( + fieldNames.indexOf(fieldName)), + "Must use sequence group for aggregation functions but not found for field %s.", + fieldName); fieldAggregators.put( i, () -> diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java index 529110cabcf8..28625a9bf330 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.KeyValue; import org.apache.paimon.data.GenericRow; import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -31,6 +32,7 @@ import org.junit.jupiter.api.Test; import static org.apache.paimon.CoreOptions.FIELDS_DEFAULT_AGG_FUNC; +import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -820,14 +822,39 @@ public void testMultiSequenceFieldsPartialUpdateWithAggregationProjectPushDown() @Test public void testAggregationWithoutSequenceGroup() { - Options options = new Options(); - options.set("fields.f1.aggregate-function", "listagg"); - RowType rowType = RowType.of(DataTypes.INT(), DataTypes.INT()); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT() + }, + new String[] {"pk", "f0", "g0", "f1", "g1"}); + + Options options1 = new Options(); + options1.set("fields.f0.aggregate-function", "listagg"); + options1.set("fields.f1.aggregate-function", "listagg"); assertThatThrownBy( () -> PartialUpdateMergeFunction.factory( - options, rowType, ImmutableList.of("f0"))) - .hasMessageContaining("Must use sequence group for aggregation functions"); + options1, rowType, ImmutableList.of("pk"))) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Must use sequence group for aggregation functions but not found for field f0.")); + + Options options2 = new Options(options1.toMap()); + options2.set("fields.g0.sequence-group", "f0"); + assertThatThrownBy( + () -> + PartialUpdateMergeFunction.factory( + options2, rowType, ImmutableList.of("pk"))) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Must use sequence group for aggregation functions but not found for field f1.")); } private void add(MergeFunction function, Integer... f) {