Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: [FLINK-36061] Support Iceberg CDC Pipeline SinkV2 #3877

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flink-cdc-pipeline-connectors</artifactId>
<groupId>org.apache.flink</groupId>
<version>${revision}</version>
</parent>

<artifactId>flink-cdc-pipeline-connector-iceberg</artifactId>
<packaging>jar</packaging>

<name>flink-cdc-pipeline-connector-iceberg</name>

<properties>
<iceberg.version>1.7.1</iceberg.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-1.19</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.apache.flink.cdc.connectors.iceberg.sink;

import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.EventSinkProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergEventSinkV2;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;

import java.io.Serializable;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;

public class IcebergDataSink implements DataSink, Serializable {

// options for creating Iceberg catalog.
private final Map<String, String> catalogOptions;

// options for creating Iceberg table.
private final Map<String, String> tableOptions;

private final String commitUser;

private final Map<TableId, List<String>> partitionMaps;

private final IcebergRecordSerializer<Event> serializer;

private final ZoneId zoneId;

public final String schemaOperatorUid;

public IcebergDataSink(
Map<String, String> catalogOptions,
Map<String, String> tableOptions,
String commitUser,
Map<TableId, List<String>> partitionMaps,
IcebergRecordSerializer<Event> serializer,
ZoneId zoneId,
String schemaOperatorUid) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.commitUser = commitUser;
this.partitionMaps = partitionMaps;
this.serializer = serializer;
this.zoneId = zoneId;
this.schemaOperatorUid = schemaOperatorUid;
}

@Override
public EventSinkProvider getEventSinkProvider() {
IcebergEventSinkV2 icebergEventSink =
new IcebergEventSinkV2(
tableOptions, commitUser, serializer, schemaOperatorUid, zoneId);
return FlinkSinkProvider.of(icebergEventSink);
}

@Override
public MetadataApplier getMetadataApplier() {
return new IcebergMetadataApplier(catalogOptions, tableOptions, partitionMaps);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.apache.flink.cdc.connectors.iceberg.sink;

import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordEventSerializer;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergRecordSerializer;
import org.apache.flink.table.catalog.Catalog;

import org.apache.commons.collections.map.HashedMap;
import org.apache.iceberg.flink.FlinkCatalogFactory;

import java.time.ZoneId;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES;

public class IcebergDataSinkFactory implements DataSinkFactory {

public static final String IDENTIFIER = "iceberg";

@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);

Map<String, String> allOptions = context.getFactoryConfiguration().toMap();
Map<String, String> catalogOptions = new HashMap<>();
Map<String, String> tableOptions = new HashMap<>();
allOptions.forEach(
(key, value) -> {
if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
tableOptions.put(key.substring(PREFIX_TABLE_PROPERTIES.length()), value);
} else if (key.startsWith(IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES)) {
catalogOptions.put(
key.substring(
IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES.length()),
value);
}
});
FlinkCatalogFactory factory = new FlinkCatalogFactory();
try {
Catalog catalog =
factory.createCatalog(
catalogOptions.getOrDefault("default-database", "default"),
catalogOptions);
Preconditions.checkNotNull(
catalog.listDatabases(), "catalog option of Paimon is invalid.");
} catch (Exception e) {
throw new RuntimeException("failed to create or use paimon catalog", e);
}
ZoneId zoneId = ZoneId.systemDefault();
if (!Objects.equals(
context.getPipelineConfiguration().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
PipelineOptions.PIPELINE_LOCAL_TIME_ZONE.defaultValue())) {
zoneId =
ZoneId.of(
context.getPipelineConfiguration()
.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
}
String commitUser =
context.getFactoryConfiguration().get(IcebergDataSinkOptions.COMMIT_USER);
IcebergRecordSerializer<Event> serializer =
new IcebergRecordEventSerializer(new HashedMap(), zoneId);
String schemaOperatorUid =
context.getPipelineConfiguration()
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID);
return new IcebergDataSink(
catalogOptions,
tableOptions,
commitUser,
new HashMap<>(),
serializer,
zoneId,
schemaOperatorUid);
}

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IcebergDataSinkOptions.METASTORE);
return options;
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IcebergDataSinkOptions.WAREHOUSE);
options.add(IcebergDataSinkOptions.URI);
options.add(IcebergDataSinkOptions.COMMIT_USER);
options.add(IcebergDataSinkOptions.PARTITION_KEY);
return options;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.apache.flink.cdc.connectors.iceberg.sink;

import org.apache.flink.cdc.common.configuration.ConfigOption;

import static org.apache.flink.cdc.common.configuration.ConfigOptions.key;

public class IcebergDataSinkOptions {

// prefix for passing properties for table creation.
public static final String PREFIX_TABLE_PROPERTIES = "table.properties.";

// prefix for passing properties for catalog creation.
public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties.";

public static final ConfigOption<String> COMMIT_USER =
key("commit.user")
.stringType()
.defaultValue("admin")
.withDescription("User name for committing data files.");

public static final ConfigOption<String> WAREHOUSE =
key("catalog.properties.warehouse")
.stringType()
.noDefaultValue()
.withDescription("The warehouse root path of catalog.");

public static final ConfigOption<String> METASTORE =
key("catalog.properties.metastore")
.stringType()
.noDefaultValue()
.withDescription("Metastore of iceberg catalog, supports filesystem and hive.");

public static final ConfigOption<String> URI =
key("catalog.properties.uri")
.stringType()
.noDefaultValue()
.withDescription("Uri of metastore server.");

public static final ConfigOption<String> PARTITION_KEY =
key("partition.key")
.stringType()
.defaultValue("")
.withDescription(
"Partition keys for each partitioned table, allow setting multiple primary keys for multiTables. "
+ "Tables are separated by ';', and partition keys are separated by ','. "
+ "For example, we can set partition.key of two tables by 'testdb.table1:id1,id2;testdb.table2:name'.");
}
Loading
Loading