Skip to content

Commit

Permalink
[flink] supports the log system to use the insert-only format.
Browse files Browse the repository at this point in the history
  • Loading branch information
liming30 committed Feb 11, 2025
1 parent 6facb71 commit ef87a50
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,13 @@ public class CoreOptions implements Serializable {
.defaultValue("debezium-json")
.withDescription("Specify the message format of log system.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<Boolean> LOG_IGNORE_DELETE =
key("log.ignore-delete")
.booleanType()
.defaultValue(false)
.withDescription("Specify whether the log system ignores delete records.");

public static final ConfigOption<Boolean> AUTO_CREATE =
key("auto-create")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public KafkaLogSourceProvider createSourceProvider(
.createRuntimeDecoder(sourceContext, keyType);
}
DeserializationSchema<RowData> valueDeserializer =
LogStoreTableFactory.getValueDecodingFormat(helper)
LogStoreTableFactory.getValueDecodingFormat(helper, primaryKey.length != 0)
.createRuntimeDecoder(sourceContext, physicalType);
Options options = toOptions(helper.getOptions());
Long timestampMills = options.get(SCAN_TIMESTAMP_MILLIS);
Expand Down Expand Up @@ -127,7 +127,7 @@ public KafkaLogSinkProvider createSinkProvider(
.createRuntimeEncoder(sinkContext, keyType);
}
SerializationSchema<RowData> valueSerializer =
LogStoreTableFactory.getValueEncodingFormat(helper)
LogStoreTableFactory.getValueEncodingFormat(helper, primaryKey.length != 0)
.createRuntimeEncoder(sinkContext, physicalType);
Options options = toOptions(helper.getOptions());
return new KafkaLogSinkProvider(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@
import org.apache.paimon.types.RowKind;

import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.paimon.CoreOptions.LOG_FORMAT;
import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;
import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
import static org.apache.paimon.flink.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
import static org.apache.paimon.flink.kafka.KafkaLogTestUtils.testContext;
Expand Down Expand Up @@ -69,6 +75,50 @@ public void testUnawareBucket() throws Exception {
checkNonKeyed(LogChangelogMode.ALL, -1, 5, 3);
}

@Test
public void testNonKeyedWithInsertOnlyFormat() throws Exception {
check(
LogChangelogMode.AUTO,
false,
-1,
3,
5,
RowKind.INSERT,
Collections.singletonMap(LOG_FORMAT.key(), "json"));
check(
LogChangelogMode.AUTO,
false,
-1,
3,
5,
RowKind.UPDATE_AFTER,
Collections.singletonMap(LOG_FORMAT.key(), "json"));
}

@Test
public void testKeyedWithInsertOnlyFormat() throws Exception {
Map<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put(LOG_FORMAT.key(), "json");

assertThatThrownBy(
() ->
check(
LogChangelogMode.AUTO,
true,
-1,
3,
5,
RowKind.INSERT,
dynamicOptions))
.isInstanceOf(ValidationException.class)
.hasMessageContaining(
"A value format should deal with all records. But json has a changelog mode of [INSERT]");

dynamicOptions.put(LOG_IGNORE_DELETE.key(), "true");
check(LogChangelogMode.AUTO, true, -1, 3, 5, RowKind.INSERT, dynamicOptions);
check(LogChangelogMode.AUTO, true, -1, 3, 5, RowKind.UPDATE_AFTER, dynamicOptions);
}

private void checkKeyed(LogChangelogMode mode, int bucket, int key, int value)
throws Exception {
check(mode, true, bucket, key, value, RowKind.INSERT);
Expand All @@ -88,11 +138,23 @@ private void checkNonKeyed(LogChangelogMode mode, int bucket, int key, int value
private void check(
LogChangelogMode mode, boolean keyed, int bucket, int key, int value, RowKind rowKind)
throws Exception {
check(mode, keyed, bucket, key, value, rowKind, Collections.emptyMap());
}

private void check(
LogChangelogMode mode,
boolean keyed,
int bucket,
int key,
int value,
RowKind rowKind,
Map<String, String> dynamicOptions)
throws Exception {
KafkaLogSerializationSchema serializer =
createTestSerializationSchema(testContext("", mode, keyed));
createTestSerializationSchema(testContext("", mode, keyed, dynamicOptions));
serializer.open(null);
KafkaRecordDeserializationSchema<RowData> deserializer =
createTestDeserializationSchema(testContext("", mode, keyed));
createTestDeserializationSchema(testContext("", mode, keyed, dynamicOptions));
deserializer.open(null);

SinkRecord input = testRecord(keyed, bucket, key, value, rowKind);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,38 @@ static ResolvedCatalogTable createResolvedTable(

public static DynamicTableFactory.Context testContext(
String servers, LogChangelogMode changelogMode, boolean keyed) {
return testContext("table", servers, changelogMode, LogConsistency.TRANSACTIONAL, keyed);
return testContext(servers, changelogMode, keyed, Collections.emptyMap());
}

public static DynamicTableFactory.Context testContext(
String servers,
LogChangelogMode changelogMode,
boolean keyed,
Map<String, String> dynamicOptions) {
return testContext(
"table",
servers,
changelogMode,
LogConsistency.TRANSACTIONAL,
keyed,
dynamicOptions);
}

static DynamicTableFactory.Context testContext(
String name,
String servers,
LogChangelogMode changelogMode,
LogConsistency consistency,
boolean keyed) {
boolean keyed,
Map<String, String> dynamicOptions) {
return testContext(
name,
servers,
changelogMode,
consistency,
RowType.of(new IntType(), new IntType()),
keyed ? new int[] {0} : new int[0],
new HashMap<>());
dynamicOptions);
}

public static DynamicTableFactory.Context testContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import javax.annotation.Nullable;

import static org.apache.paimon.CoreOptions.LOG_FORMAT;
import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;
import static org.apache.paimon.CoreOptions.LOG_KEY_FORMAT;

/**
Expand Down Expand Up @@ -100,6 +101,12 @@ static ConfigOption<String> logFormat() {
.defaultValue(LOG_FORMAT.defaultValue());
}

static ConfigOption<Boolean> logIgnoreDelete() {
return ConfigOptions.key(LOG_IGNORE_DELETE.key())
.booleanType()
.defaultValue(LOG_IGNORE_DELETE.defaultValue());
}

static LogStoreTableFactory discoverLogStoreFactory(ClassLoader cl, String identifier) {
return FactoryUtil.discoverFactory(cl, LogStoreTableFactory.class, identifier);
}
Expand All @@ -121,18 +128,20 @@ static EncodingFormat<SerializationSchema<RowData>> getKeyEncodingFormat(
}

static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
FlinkTableFactoryHelper helper) {
FlinkTableFactoryHelper helper, boolean hasPrimaryKey) {
DecodingFormat<DeserializationSchema<RowData>> format =
helper.discoverDecodingFormat(DeserializationFormatFactory.class, logFormat());
validateValueFormat(format, helper.getOptions().get(logFormat()));
boolean insertOnly = !hasPrimaryKey || helper.getOptions().get(logIgnoreDelete());
validateValueFormat(format, helper.getOptions().get(logFormat()), insertOnly);
return format;
}

static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
FlinkTableFactoryHelper helper) {
FlinkTableFactoryHelper helper, boolean hasPrimaryKey) {
EncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(SerializationFormatFactory.class, logFormat());
validateValueFormat(format, helper.getOptions().get(logFormat()));
boolean insertOnly = !hasPrimaryKey || helper.getOptions().get(logIgnoreDelete());
validateValueFormat(format, helper.getOptions().get(logFormat()), insertOnly);
return format;
}

Expand All @@ -146,8 +155,8 @@ static void validateKeyFormat(Format format, String name) {
}
}

static void validateValueFormat(Format format, String name) {
if (!format.getChangelogMode().equals(ChangelogMode.all())) {
static void validateValueFormat(Format format, String name, boolean insertOnly) {
if (!insertOnly && !format.getChangelogMode().equals(ChangelogMode.all())) {
throw new ValidationException(
String.format(
"A value format should deal with all records. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.log.LogWriteCallback;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;

Expand Down Expand Up @@ -49,6 +50,8 @@
import java.util.List;
import java.util.Objects;

import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;

/** A {@link PrepareCommitOperator} to write {@link InternalRow}. Record schema is fixed. */
public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> {

Expand All @@ -57,6 +60,7 @@ public class RowDataStoreWriteOperator extends TableWriteOperator<InternalRow> {
@Nullable private final LogSinkFunction logSinkFunction;
private transient SimpleContext sinkContext;
@Nullable private transient LogWriteCallback logCallback;
private transient boolean logIgnoreDelete;

/** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
private long currentWatermark = Long.MIN_VALUE;
Expand Down Expand Up @@ -97,6 +101,7 @@ public void open() throws Exception {
openFunction(logSinkFunction);
logCallback = new LogWriteCallback();
logSinkFunction.setWriteCallback(logCallback);
logIgnoreDelete = Options.fromMap(table.options()).get(LOG_IGNORE_DELETE);
}
}

Expand Down Expand Up @@ -139,7 +144,9 @@ record = write.write(element.getValue());
throw new IOException(e);
}

if (record != null && logSinkFunction != null) {
if (record != null
&& logSinkFunction != null
&& (!logIgnoreDelete || record.row().getRowKind().isAdd())) {
// write to log store, need to preserve original pk (which includes partition fields)
SinkRecord logRecord = write.toLogRecord(record);
logSinkFunction.invoke(logRecord, sinkContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
import static org.apache.paimon.CoreOptions.LOG_IGNORE_DELETE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER;
Expand Down Expand Up @@ -149,6 +150,10 @@ public ChangelogMode getChangelogMode() {
return ChangelogMode.all();
}

if (logStoreTableFactory != null && options.get(LOG_IGNORE_DELETE)) {
return ChangelogMode.insertOnly();
}

// optimization: transaction consistency and all changelog mode avoid the generation of
// normalized nodes. See FlinkTableSink.getChangelogMode validation.
return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL
Expand Down

0 comments on commit ef87a50

Please sign in to comment.