| 
85 | 85 | import org.assertj.core.api.HamcrestCondition;  | 
86 | 86 | import org.assertj.core.api.InstanceOfAssertFactories;  | 
87 | 87 | import org.junit.jupiter.api.Test;  | 
 | 88 | +import org.junit.jupiter.params.ParameterizedTest;  | 
 | 89 | +import org.junit.jupiter.params.provider.Arguments;  | 
 | 90 | +import org.junit.jupiter.params.provider.MethodSource;  | 
88 | 91 | 
 
  | 
89 | 92 | import javax.annotation.Nullable;  | 
90 | 93 | 
 
  | 
 | 
96 | 99 | import java.util.Optional;  | 
97 | 100 | import java.util.Set;  | 
98 | 101 | import java.util.TreeMap;  | 
 | 102 | +import java.util.function.Consumer;  | 
99 | 103 | import java.util.stream.Collectors;  | 
 | 104 | +import java.util.stream.Stream;  | 
100 | 105 | 
 
  | 
101 | 106 | import static org.apache.flink.table.api.Expressions.$;  | 
102 | 107 | import static org.apache.flink.table.planner.utils.OperationMatchers.entry;  | 
@@ -1404,97 +1409,93 @@ void testFailedToAlterTableDropDistribution() throws Exception {  | 
1404 | 1409 |         checkAlterNonExistTable("alter table %s nonexistent drop watermark");  | 
1405 | 1410 |     }  | 
1406 | 1411 | 
 
  | 
1407 |  | -    @Test  | 
1408 |  | -    void createMaterializedTableWithRefreshModeContinuous() throws Exception {  | 
1409 |  | -        final String sql =  | 
1410 |  | -                "CREATE MATERIALIZED TABLE users_shops ("  | 
1411 |  | -                        + " PRIMARY KEY (user_id) not enforced)"  | 
1412 |  | -                        + " WITH(\n"  | 
1413 |  | -                        + "   'format' = 'debezium-json'\n"  | 
1414 |  | -                        + " )\n"  | 
1415 |  | -                        + " FRESHNESS = INTERVAL '30' SECOND\n"  | 
1416 |  | -                        + " REFRESH_MODE = CONTINUOUS\n"  | 
1417 |  | -                        + " AS SELECT 1 as shop_id, 2 as user_id ";  | 
1418 |  | - | 
1419 |  | -        final String expectedSummaryString =  | 
1420 |  | -                "CREATE MATERIALIZED TABLE: (materializedTable: "  | 
1421 |  | -                        + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"  | 
1422 |  | -                        + "  `shop_id` INT NOT NULL,\n"  | 
1423 |  | -                        + "  `user_id` INT NOT NULL,\n"  | 
1424 |  | -                        + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"  | 
1425 |  | -                        + "), comment='null', distribution=null, partitionKeys=[], "  | 
1426 |  | -                        + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "  | 
1427 |  | -                        + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, "  | 
1428 |  | -                        + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n"  | 
1429 |  | -                        + "  `shop_id` INT NOT NULL,\n"  | 
1430 |  | -                        + "  `user_id` INT NOT NULL,\n"  | 
1431 |  | -                        + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"  | 
1432 |  | -                        + ")}], identifier: [`builtin`.`default`.`users_shops`])";  | 
 | 1412 | +    @ParameterizedTest(name = "[{index}] {0}")  | 
 | 1413 | +    @MethodSource("provideCreateMaterializedTableTestCases")  | 
 | 1414 | +    void createMaterializedTableWithVariousOptions(  | 
 | 1415 | +            String testName,  | 
 | 1416 | +            String sql,  | 
 | 1417 | +            String expectedSummaryString,  | 
 | 1418 | +            Consumer<CreateMaterializedTableOperation> additionalAssertions) {  | 
1433 | 1419 | 
 
  | 
1434 | 1420 |         final Operation operation = parse(sql);  | 
1435 | 1421 | 
 
  | 
1436 | 1422 |         assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);  | 
1437 | 1423 |         assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);  | 
 | 1424 | + | 
1438 | 1425 |         final CreateMaterializedTableOperation createMaterializedTableOperation =  | 
1439 | 1426 |                 (CreateMaterializedTableOperation) operation;  | 
1440 |  | -        assertThat(  | 
1441 |  | -                        createMaterializedTableOperation  | 
1442 |  | -                                .getCatalogMaterializedTable()  | 
1443 |  | -                                .getDefinitionFreshness())  | 
1444 |  | -                .isEqualTo(IntervalFreshness.ofSecond("30"));  | 
1445 |  | -        assertThat(createMaterializedTableOperation.getCatalogMaterializedTable().getRefreshMode())  | 
1446 |  | -                .isSameAs(RefreshMode.CONTINUOUS);  | 
1447 | 1427 | 
 
  | 
1448 |  | -        prepareMaterializedTable("tb2", false, 1, null, "SELECT 1");  | 
1449 |  | -    }  | 
1450 |  | - | 
1451 |  | -    @Test  | 
1452 |  | -    void createMaterializedTableWithDistribution() throws Exception {  | 
1453 |  | -        final String sql =  | 
1454 |  | -                "CREATE MATERIALIZED TABLE users_shops ("  | 
1455 |  | -                        + " PRIMARY KEY (user_id) not enforced)"  | 
1456 |  | -                        + " DISTRIBUTED BY HASH (user_id) INTO 7 BUCKETS\n"  | 
1457 |  | -                        + " WITH(\n"  | 
1458 |  | -                        + "   'format' = 'debezium-json'\n"  | 
1459 |  | -                        + " )\n"  | 
1460 |  | -                        + " FRESHNESS = INTERVAL '30' SECOND\n"  | 
1461 |  | -                        + " AS SELECT 1 as shop_id, 2 as user_id ";  | 
1462 |  | - | 
1463 |  | -        final String expectedSummaryString =  | 
1464 |  | -                "CREATE MATERIALIZED TABLE: (materializedTable: "  | 
1465 |  | -                        + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"  | 
1466 |  | -                        + "  `shop_id` INT NOT NULL,\n"  | 
1467 |  | -                        + "  `user_id` INT NOT NULL,\n"  | 
1468 |  | -                        + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"  | 
1469 |  | -                        + "), comment='null', distribution=DISTRIBUTED BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], "  | 
1470 |  | -                        + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "  | 
1471 |  | -                        + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=AUTOMATIC, refreshMode=null, "  | 
1472 |  | -                        + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n"  | 
1473 |  | -                        + "  `shop_id` INT NOT NULL,\n"  | 
1474 |  | -                        + "  `user_id` INT NOT NULL,\n"  | 
1475 |  | -                        + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"  | 
1476 |  | -                        + ")}], identifier: [`builtin`.`default`.`users_shops`])";  | 
1477 |  | - | 
1478 |  | -        final Operation operation = parse(sql);  | 
1479 |  | - | 
1480 |  | -        assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);  | 
1481 |  | -        assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);  | 
1482 |  | -        assertThat(  | 
1483 |  | -                        ((CreateMaterializedTableOperation) operation)  | 
1484 |  | -                                .getCatalogMaterializedTable()  | 
1485 |  | -                                .getDistribution()  | 
1486 |  | -                                .get())  | 
1487 |  | -                .isEqualTo(TableDistribution.of(Kind.HASH, 7, List.of("user_id")));  | 
1488 |  | - | 
1489 |  | -        prepareMaterializedTable("tb2", false, 1, null, "SELECT 1");  | 
1490 |  | - | 
1491 |  | -        assertThatThrownBy(  | 
1492 |  | -                        () ->  | 
1493 |  | -                                parse(  | 
1494 |  | -                                        "alter MATERIALIZED table cat1.db1.tb2 modify distribution into 3 buckets"))  | 
1495 |  | -                .isInstanceOf(ValidationException.class)  | 
1496 |  | -                .hasMessageContaining(  | 
1497 |  | -                        "Materialized table `cat1`.`db1`.`tb2` does not have a distribution to modify.");  | 
 | 1428 | +        additionalAssertions.accept(createMaterializedTableOperation);  | 
 | 1429 | +    }  | 
 | 1430 | + | 
 | 1431 | +    private static Stream<Arguments> provideCreateMaterializedTableTestCases() {  | 
 | 1432 | +        return Stream.of(  | 
 | 1433 | +                Arguments.of(  | 
 | 1434 | +                        "with refresh mode continuous",  | 
 | 1435 | +                        "CREATE MATERIALIZED TABLE users_shops ("  | 
 | 1436 | +                                + " PRIMARY KEY (user_id) not enforced)"  | 
 | 1437 | +                                + " WITH(\n"  | 
 | 1438 | +                                + "   'format' = 'debezium-json'\n"  | 
 | 1439 | +                                + " )\n"  | 
 | 1440 | +                                + " FRESHNESS = INTERVAL '30' SECOND\n"  | 
 | 1441 | +                                + " REFRESH_MODE = CONTINUOUS\n"  | 
 | 1442 | +                                + " AS SELECT 1 as shop_id, 2 as user_id ",  | 
 | 1443 | +                        "CREATE MATERIALIZED TABLE: (materializedTable: "  | 
 | 1444 | +                                + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"  | 
 | 1445 | +                                + "  `shop_id` INT NOT NULL,\n"  | 
 | 1446 | +                                + "  `user_id` INT NOT NULL,\n"  | 
 | 1447 | +                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"  | 
 | 1448 | +                                + "), comment='null', distribution=null, partitionKeys=[], "  | 
 | 1449 | +                                + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "  | 
 | 1450 | +                                + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, "  | 
 | 1451 | +                                + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n"  | 
 | 1452 | +                                + "  `shop_id` INT NOT NULL,\n"  | 
 | 1453 | +                                + "  `user_id` INT NOT NULL,\n"  | 
 | 1454 | +                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"  | 
 | 1455 | +                                + ")}], identifier: [`builtin`.`default`.`users_shops`])",  | 
 | 1456 | +                        (Consumer<CreateMaterializedTableOperation>)  | 
 | 1457 | +                                op -> {  | 
 | 1458 | +                                    assertThat(  | 
 | 1459 | +                                                    op.getCatalogMaterializedTable()  | 
 | 1460 | +                                                            .getDefinitionFreshness())  | 
 | 1461 | +                                            .isEqualTo(IntervalFreshness.ofSecond("30"));  | 
 | 1462 | +                                    assertThat(op.getCatalogMaterializedTable().getRefreshMode())  | 
 | 1463 | +                                            .isSameAs(RefreshMode.CONTINUOUS);  | 
 | 1464 | +                                }),  | 
 | 1465 | +                Arguments.of(  | 
 | 1466 | +                        "with distribution",  | 
 | 1467 | +                        "CREATE MATERIALIZED TABLE users_shops ("  | 
 | 1468 | +                                + " PRIMARY KEY (user_id) not enforced)"  | 
 | 1469 | +                                + " DISTRIBUTED BY HASH (user_id) INTO 7 BUCKETS\n"  | 
 | 1470 | +                                + " WITH(\n"  | 
 | 1471 | +                                + "   'format' = 'debezium-json'\n"  | 
 | 1472 | +                                + " )\n"  | 
 | 1473 | +                                + " FRESHNESS = INTERVAL '30' SECOND\n"  | 
 | 1474 | +                                + " AS SELECT 1 as shop_id, 2 as user_id ",  | 
 | 1475 | +                        "CREATE MATERIALIZED TABLE: (materializedTable: "  | 
 | 1476 | +                                + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"  | 
 | 1477 | +                                + "  `shop_id` INT NOT NULL,\n"  | 
 | 1478 | +                                + "  `user_id` INT NOT NULL,\n"  | 
 | 1479 | +                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"  | 
 | 1480 | +                                + "), comment='null', distribution=DISTRIBUTED BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], "  | 
 | 1481 | +                                + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "  | 
 | 1482 | +                                + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=AUTOMATIC, refreshMode=null, "  | 
 | 1483 | +                                + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n"  | 
 | 1484 | +                                + "  `shop_id` INT NOT NULL,\n"  | 
 | 1485 | +                                + "  `user_id` INT NOT NULL,\n"  | 
 | 1486 | +                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n"  | 
 | 1487 | +                                + ")}], identifier: [`builtin`.`default`.`users_shops`])",  | 
 | 1488 | +                        (Consumer<CreateMaterializedTableOperation>)  | 
 | 1489 | +                                op ->  | 
 | 1490 | +                                        assertThat(  | 
 | 1491 | +                                                        op.getCatalogMaterializedTable()  | 
 | 1492 | +                                                                .getDistribution()  | 
 | 1493 | +                                                                .get())  | 
 | 1494 | +                                                .isEqualTo(  | 
 | 1495 | +                                                        TableDistribution.of(  | 
 | 1496 | +                                                                Kind.HASH,  | 
 | 1497 | +                                                                7,  | 
 | 1498 | +                                                                List.of("user_id")))));  | 
1498 | 1499 |     }  | 
1499 | 1500 | 
 
  | 
1500 | 1501 |     @Test  | 
 | 
0 commit comments