Skip to content

Commit 3473e2d

Browse files
authored
[Feature][Flink] Support CDC source schema evolution on Flink engine (#9867)
1 parent 6d9e0a1 commit 3473e2d

File tree

25 files changed

+1978
-16
lines changed

25 files changed

+1978
-16
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/event/EventType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ public enum EventType {
3131
LIFECYCLE_WRITER_CLOSE,
3232
READER_MESSAGE_DELAYED,
3333
JOB_STATUS,
34+
SCHEMA_CHANGE_FLUSH,
3435
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SupportSchemaEvolutionSinkWriter.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.seatunnel.api.sink;
1919

20+
import org.apache.seatunnel.api.table.coordinator.SchemaCoordinator;
21+
import org.apache.seatunnel.api.table.schema.event.FlushEvent;
2022
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
2123

2224
import java.io.IOException;
@@ -30,4 +32,48 @@ public interface SupportSchemaEvolutionSinkWriter {
3032
* @throws IOException
3133
*/
3234
void applySchemaChange(SchemaChangeEvent event) throws IOException;
35+
36+
/**
37+
* handle FlushEvent propagated from upstream
38+
*
39+
* @param event
40+
* @throws IOException
41+
*/
42+
default void handleFlushEvent(FlushEvent event) throws IOException {
43+
flushData();
44+
sendFlushSuccessful(event);
45+
}
46+
47+
/**
48+
* send success event to coordinator upon successful flash
49+
*
50+
* @param event
51+
* @throws IOException
52+
*/
53+
default void sendFlushSuccessful(FlushEvent event) throws IOException {
54+
SchemaCoordinator coordinator = getSchemaCoordinator();
55+
if (coordinator == null && event != null && event.getJobId() != null) {
56+
coordinator = SchemaCoordinator.getOrCreateInstance(event.getJobId());
57+
}
58+
59+
if (coordinator != null) {
60+
coordinator.notifyFlushSuccessful(event.getJobId(), event.tableIdentifier());
61+
}
62+
}
63+
64+
/**
65+
* Get the schema coordinator instance for reporting flush completion
66+
*
67+
* @return the schema coordinator instance, or null if not available
68+
*/
69+
default SchemaCoordinator getSchemaCoordinator() {
70+
return null;
71+
}
72+
73+
/**
74+
* flush data to other system
75+
*
76+
* @throws IOException
77+
*/
78+
default void flushData() throws IOException {}
3379
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.seatunnel.api.sink.SinkWriter;
2222
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
2323
import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
24+
import org.apache.seatunnel.api.table.schema.event.FlushEvent;
2425
import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
2526
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2627
import org.apache.seatunnel.api.tracing.MDCTracer;
@@ -175,8 +176,36 @@ public void applySchemaChange(SchemaChangeEvent event) throws IOException {
175176
}
176177
}
177178

179+
@Override
180+
public void handleFlushEvent(FlushEvent event) throws IOException {
181+
subSinkErrorCheck();
182+
String targetTableId = event.tableIdentifier().toTablePath().getFullName();
183+
for (int i = 0; i < sinkWritersWithIndex.size(); i++) {
184+
for (Map.Entry<SinkIdentifier, SinkWriter<SeaTunnelRow, ?, ?>> sinkWriterEntry :
185+
sinkWritersWithIndex.get(i).entrySet()) {
186+
if (sinkWriterEntry.getKey().getTableIdentifier().equals(targetTableId)) {
187+
synchronized (runnable.get(i)) {
188+
SinkWriter<SeaTunnelRow, ?, ?> sink = sinkWriterEntry.getValue();
189+
if (sink instanceof SupportSchemaEvolutionSinkWriter) {
190+
((SupportSchemaEvolutionSinkWriter) sink).handleFlushEvent(event);
191+
}
192+
}
193+
return;
194+
}
195+
}
196+
}
197+
}
198+
178199
@Override
179200
public void write(SeaTunnelRow element) throws IOException {
201+
if (element != null && element.getOptions() != null) {
202+
if (element.getOptions().containsKey("flush_event")
203+
|| element.getOptions().containsKey("schema_change_event")) {
204+
log.debug("Skipping schema change event row: {}", element.getOptions().keySet());
205+
return;
206+
}
207+
}
208+
180209
if (!submitted) {
181210
submitted = true;
182211
runnable.forEach(executorService::submit);

0 commit comments

Comments
 (0)