Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support create table like in flink catalog and watermark in windows #12116

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,20 @@ public class FlinkCatalog extends AbstractCatalog {
private final Namespace baseNamespace;
private final SupportsNamespaces asNamespaceCatalog;
private final Closeable closeable;
private final Map<String, String> catalogProps;
private final boolean cacheEnabled;

public FlinkCatalog(
String catalogName,
String defaultDatabase,
Namespace baseNamespace,
CatalogLoader catalogLoader,
Map<String, String> catalogProps,
boolean cacheEnabled,
long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.catalogProps = catalogProps;
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;

Expand Down Expand Up @@ -332,7 +335,15 @@ public List<String> listTables(String databaseName)
public CatalogTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
Table table = loadIcebergTable(tablePath);
return toCatalogTable(table);
Map<String, String> catalogAndTableProps = Maps.newHashMap(catalogProps);
catalogAndTableProps.put(FlinkCreateTableOptions.CATALOG_NAME.key(), getName());
catalogAndTableProps.put(
FlinkCreateTableOptions.CATALOG_DATABASE.key(), tablePath.getDatabaseName());
catalogAndTableProps.put(
FlinkCreateTableOptions.CATALOG_TABLE.key(), tablePath.getObjectName());
catalogAndTableProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER);
catalogAndTableProps.putAll(table.properties());
return toCatalogTableWithProps(table, catalogAndTableProps);
}

private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
Expand Down Expand Up @@ -384,13 +395,6 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws CatalogException, TableAlreadyExistException {
if (Objects.equals(
table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
throw new IllegalArgumentException(
"Cannot create the table with 'connector'='iceberg' table property in "
+ "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
+ "create table without 'connector'='iceberg' related properties in an iceberg table.");
}
Comment on lines -387 to -393
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove this check?

Copy link
Author

@swapna267 swapna267 Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tables can be created using LIKE in

  1. Flink Catalog - Not supported currently.
  2. Another table in Iceberg catalog itself as detailed in doc

This check basically fails, if we try to create table using LIKE in Iceberg catalog, basically case#2 if we have connector=iceberg in options . For example, DDL like below,

CREATE TABLE  `hive_catalog`.`default`.`sample_like` 
LIKE `hive_catalog`.`default`.`sample`
WITH ('connector'='iceberg')

In order to support Case#1 without user setting any extra Options using WITH clause, we need to add connector in getTable,

catalogAndTableProps.put("connector", FlinkDynamicTableFactory.FACTORY_IDENTIFIER);

This check was added in very old PR,
#2666
#2666 (comment) where Flink SQL didn't support CREATE TABLE A LIKE B , where A and B are in different Catalogs.

So, in this case by removing this check, we are ignoring connector option being passed, so following DDL can create table table_like in Flink catalog backed by iceberg_catalog.db.table. As we know source table is an Iceberg table, adding connector=iceberg would be redundant.

CREATE TABLE table_like (
      eventTS AS CAST(t1 AS TIMESTAMP(3)),
) LIKE iceberg_catalog.db.table;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the source table is not an Iceberg table?
I'm trying to understand here, where we get the missing information in this case, and wether we have a way to check that we actually get the missing information. If we can create such a check, then we can still throw an exception when we don't get this information from any source

Preconditions.checkArgument(table instanceof ResolvedCatalogTable, "table should be resolved");
createIcebergTable(tablePath, (ResolvedCatalogTable) table, ignoreIfExists);
}
Expand Down Expand Up @@ -625,7 +629,7 @@ private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSc
return partitionKeysBuilder.build();
}

static CatalogTable toCatalogTable(Table table) {
static CatalogTable toCatalogTableWithProps(Table table, Map<String, String> props) {
TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());

Expand All @@ -634,7 +638,11 @@ static CatalogTable toCatalogTable(Table table) {
// CatalogTableImpl to copy a new catalog table.
// Let's re-loading table from Iceberg catalog when creating source/sink operators.
// Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);
return new CatalogTableImpl(schema, partitionKeys, props, null);
}

static CatalogTable toCatalogTable(Table table) {
return toCatalogTableWithProps(table, table.properties());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ protected Catalog createCatalog(
defaultDatabase,
baseNamespace,
catalogLoader,
properties,
cacheEnabled,
cacheExpirationIntervalMs);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

public class FlinkCreateTableOptions {

private FlinkCreateTableOptions() {}

public static final ConfigOption<String> CATALOG_NAME =
ConfigOptions.key("catalog-name")
.stringType()
.noDefaultValue()
.withDescription("Catalog name");

public static final ConfigOption<String> CATALOG_TYPE =
ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
.stringType()
.noDefaultValue()
.withDescription("Catalog type, the optional types are: custom, hadoop, hive.");

public static final ConfigOption<String> CATALOG_DATABASE =
ConfigOptions.key("catalog-database")
.stringType()
.defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
.withDescription("Database name managed in the iceberg catalog.");

public static final ConfigOption<String> CATALOG_TABLE =
ConfigOptions.key("catalog-table")
.stringType()
.noDefaultValue()
.withDescription("Table name managed in the underlying iceberg catalog and database.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
Expand All @@ -45,31 +44,6 @@
public class FlinkDynamicTableFactory
implements DynamicTableSinkFactory, DynamicTableSourceFactory {
static final String FACTORY_IDENTIFIER = "iceberg";

private static final ConfigOption<String> CATALOG_NAME =
ConfigOptions.key("catalog-name")
.stringType()
.noDefaultValue()
.withDescription("Catalog name");

private static final ConfigOption<String> CATALOG_TYPE =
ConfigOptions.key(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE)
.stringType()
.noDefaultValue()
.withDescription("Catalog type, the optional types are: custom, hadoop, hive.");

private static final ConfigOption<String> CATALOG_DATABASE =
ConfigOptions.key("catalog-database")
.stringType()
.defaultValue(FlinkCatalogFactory.DEFAULT_DATABASE_NAME)
.withDescription("Database name managed in the iceberg catalog.");

private static final ConfigOption<String> CATALOG_TABLE =
ConfigOptions.key("catalog-table")
.stringType()
.noDefaultValue()
.withDescription("Table name managed in the underlying iceberg catalog and database.");

private final FlinkCatalog catalog;

public FlinkDynamicTableFactory() {
Expand Down Expand Up @@ -127,16 +101,16 @@ public DynamicTableSink createDynamicTableSink(Context context) {
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = Sets.newHashSet();
options.add(CATALOG_TYPE);
options.add(CATALOG_NAME);
options.add(FlinkCreateTableOptions.CATALOG_TYPE);
options.add(FlinkCreateTableOptions.CATALOG_NAME);
return options;
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = Sets.newHashSet();
options.add(CATALOG_DATABASE);
options.add(CATALOG_TABLE);
options.add(FlinkCreateTableOptions.CATALOG_DATABASE);
options.add(FlinkCreateTableOptions.CATALOG_TABLE);
return options;
}

Expand All @@ -153,14 +127,17 @@ private static TableLoader createTableLoader(
Configuration flinkConf = new Configuration();
tableProps.forEach(flinkConf::setString);

String catalogName = flinkConf.getString(CATALOG_NAME);
String catalogName = flinkConf.getString(FlinkCreateTableOptions.CATALOG_NAME);
Preconditions.checkNotNull(
catalogName, "Table property '%s' cannot be null", CATALOG_NAME.key());
catalogName,
"Table property '%s' cannot be null",
FlinkCreateTableOptions.CATALOG_NAME.key());

String catalogDatabase = flinkConf.getString(CATALOG_DATABASE, databaseName);
String catalogDatabase =
flinkConf.getString(FlinkCreateTableOptions.CATALOG_DATABASE, databaseName);
Preconditions.checkNotNull(catalogDatabase, "The iceberg database name cannot be null");

String catalogTable = flinkConf.getString(CATALOG_TABLE, tableName);
String catalogTable = flinkConf.getString(FlinkCreateTableOptions.CATALOG_TABLE, tableName);
Preconditions.checkNotNull(catalogTable, "The iceberg table name cannot be null");

org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsSourceWatermark;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.types.DataType;
Expand All @@ -53,7 +54,8 @@ public class IcebergTableSource
implements ScanTableSource,
SupportsProjectionPushDown,
SupportsFilterPushDown,
SupportsLimitPushDown {
SupportsLimitPushDown,
SupportsSourceWatermark {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have 2 feature in a single PR:

  • CREATE TABLE LIKE
  • Watermark support

Could we separate out these features to different PRs?
Could we write tests for both features?

Copy link
Author

@swapna267 swapna267 Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These features were driven by mainly a use case, where an iceberg table is needed to be used in Flink window functions. This needs an incoming table to have MILLISECOND precision timestamp column and also watermark to be defined on source table.

As iceberg only supports MICROSECOND timestamp columns, we need to have a table with computed columns and we can create these only in Flink Catalog. Iceberg catalog doesn't support creating tables with computed columns.

i am happy to split them into 2 separate PR's .
I have tests for CREATE TABLE LIKE.

As Watermark support is just making Source to implement interface and falling back to #9346 for core logic, i didn't have a test case. I can add a validation on if watermark-column is configured or not , so it can fail fast. And a test case around that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please separate out the features to 2 PR


private int[] projectedFields;
private Long limit;
Expand Down Expand Up @@ -175,6 +177,14 @@ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
return Result.of(acceptedFilters, flinkFilters);
}

@Override
public void applySourceWatermark() {
if (!readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE)) {
throw new UnsupportedOperationException(
"Source watermarks are supported only in flip-27 iceberg source implementation");
}
}

@Override
public boolean supportsNestedProjection() {
// TODO: support nested projection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,23 @@ public void testCreateTableLike() throws TableNotExistException {
.isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build());
}

@TestTemplate
public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT)");

sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE tl");

CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2");
assertThat(catalogTable.getSchema())
.isEqualTo(TableSchema.builder().field("id", DataTypes.BIGINT()).build());

Map<String, String> options = catalogTable.getOptions();
assertThat(options.entrySet().containsAll(config.entrySet())).isTrue();
assertThat(options.get(FlinkCreateTableOptions.CATALOG_NAME.key())).isEqualTo(catalogName);
assertThat(options.get(FlinkCreateTableOptions.CATALOG_DATABASE.key())).isEqualTo(DATABASE);
assertThat(options.get(FlinkCreateTableOptions.CATALOG_TABLE.key())).isEqualTo("tl");
}

@TestTemplate
public void testCreateTableLocation() {
assumeThat(isHadoopCatalog)
Expand Down Expand Up @@ -660,10 +677,12 @@ private Table table(String name) {
}

private CatalogTable catalogTable(String name) throws TableNotExistException {
return catalogTable(getTableEnv().getCurrentCatalog(), DATABASE, name);
}

private CatalogTable catalogTable(String catalog, String database, String table)
throws TableNotExistException {
return (CatalogTable)
getTableEnv()
.getCatalog(getTableEnv().getCurrentCatalog())
.get()
.getTable(new ObjectPath(DATABASE, name));
getTableEnv().getCatalog(catalog).get().getTable(new ObjectPath(database, table));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,43 +256,6 @@ public void testCatalogDatabaseConflictWithFlinkDatabase() {
.hasMessageStartingWith("Could not execute CreateTable in path");
}

@TestTemplate
public void testConnectorTableInIcebergCatalog() {
// Create the catalog properties
Map<String, String> catalogProps = Maps.newHashMap();
Comment on lines -259 to -262
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test removed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is testing the check mentioned in #12116 (comment)

Fail creating a table in Iceberg Catalog if connector=iceberg is specified in the Option. As the check is been deleted, i removed this test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is still a valid check in most cases. Only not valid when the table is created with "CREATE TABLE.. LIKE" and only if the source table is an iceberg table.
Do I miss something?

catalogProps.put("type", "iceberg");
if (isHiveCatalog()) {
catalogProps.put("catalog-type", "hive");
catalogProps.put(CatalogProperties.URI, CatalogTestBase.getURI(hiveConf));
} else {
catalogProps.put("catalog-type", "hadoop");
}
catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, createWarehouse());

// Create the table properties
Map<String, String> tableProps = createTableProps();

// Create a connector table in an iceberg catalog.
sql("CREATE CATALOG `test_catalog` WITH %s", toWithClause(catalogProps));
try {
assertThatThrownBy(
() ->
sql(
"CREATE TABLE `test_catalog`.`%s`.`%s` (id BIGINT, data STRING) WITH %s",
FlinkCatalogFactory.DEFAULT_DATABASE_NAME,
TABLE_NAME,
toWithClause(tableProps)))
.cause()
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Cannot create the table with 'connector'='iceberg' table property in an iceberg catalog, "
+ "Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
+ "create table without 'connector'='iceberg' related properties in an iceberg table.");
} finally {
sql("DROP CATALOG IF EXISTS `test_catalog`");
}
}

private Map<String, String> createTableProps() {
Map<String, String> tableProps = Maps.newHashMap(properties);
tableProps.put("catalog-name", catalogName);
Expand Down