Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
new LeaseContext(
sourceEnumeratorState.getLeaseId(),
leaseContext.getKvSnapshotLeaseDurationMs()),
true);
true,
sourceEnumeratorState.isInitialDiscoveryFinished(),
sourceEnumeratorState.getUnassignedSplits());
}

@Override
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -70,11 +71,13 @@ public class FlussSourceEnumeratorStateSerializer
private static final int VERSION_0 = 0;
private static final int VERSION_1 = 1;
private static final int VERSION_2 = 2;
private static final int VERSION_3 = 3;
private static final int VERSION_4 = 4;

private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));

private static final int CURRENT_VERSION = VERSION_2;
private static final int CURRENT_VERSION = VERSION_4;

public FlussSourceEnumeratorStateSerializer(LakeSource<LakeSplit> lakeSource) {
this.lakeSource = lakeSource;
Expand All @@ -99,6 +102,12 @@ public byte[] serialize(SourceEnumeratorState state) throws IOException {
// write lease context
serializeLeaseId(out, state);

// write initial discovery finished flag (VERSION_3+)
out.writeBoolean(state.isInitialDiscoveryFinished());

// write unassigned splits (VERSION_4)
serializeUnassignedSplits(out, state.getUnassignedSplits());

final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
Expand Down Expand Up @@ -169,6 +178,10 @@ protected byte[] serializeV0(SourceEnumeratorState state) throws IOException {
@Override
public SourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
switch (version) {
case VERSION_4:
return deserializeV4(serialized);
case VERSION_3:
return deserializeV3(serialized);
case VERSION_2:
return deserializeV2(serialized);
case VERSION_1:
Expand Down Expand Up @@ -229,13 +242,69 @@ private SourceEnumeratorState deserializeV2(byte[] serialized) throws IOExceptio

// deserialize lease context
LeaseContext leaseContext = deserializeLeaseId(in);
// V2 does not have initialDiscoveryFinished flag; default to true (safe choice
// to prevent data loss: treat all newly discovered partitions as post-initial).
return new SourceEnumeratorState(
assignBucketAndPartitions.f0,
assignBucketAndPartitions.f1,
remainingHybridLakeFlussSplits,
leaseContext.getKvSnapshotLeaseId());
}

private SourceEnumeratorState deserializeV3(byte[] serialized) throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
Tuple2<Set<TableBucket>, Map<Long, String>> assignBucketAndPartitions =
deserializeAssignBucketAndPartitions(in);
List<SourceSplitBase> remainingHybridLakeFlussSplits =
deserializeRemainingHybridLakeFlussSplits(in);

// deserialize lease context
LeaseContext leaseContext = deserializeLeaseId(in);

// deserialize initial discovery finished flag
boolean initialDiscoveryFinished = in.readBoolean();

// skip initial partition IDs if present (backward compatibility with older V3 format)
if (in.available() > 0) {
int initialPartitionIdsSize = in.readInt();
for (int i = 0; i < initialPartitionIdsSize; i++) {
in.readLong();
}
}

return new SourceEnumeratorState(
assignBucketAndPartitions.f0,
assignBucketAndPartitions.f1,
remainingHybridLakeFlussSplits,
leaseContext.getKvSnapshotLeaseId(),
initialDiscoveryFinished);
}

private SourceEnumeratorState deserializeV4(byte[] serialized) throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
Tuple2<Set<TableBucket>, Map<Long, String>> assignBucketAndPartitions =
deserializeAssignBucketAndPartitions(in);
List<SourceSplitBase> remainingHybridLakeFlussSplits =
deserializeRemainingHybridLakeFlussSplits(in);

// deserialize lease context
LeaseContext leaseContext = deserializeLeaseId(in);

// deserialize initial discovery finished flag
boolean initialDiscoveryFinished = in.readBoolean();

// deserialize unassigned splits
List<SourceSplitBase> unassignedSplits = deserializeUnassignedSplits(in);

return new SourceEnumeratorState(
assignBucketAndPartitions.f0,
assignBucketAndPartitions.f1,
remainingHybridLakeFlussSplits,
leaseContext.getKvSnapshotLeaseId(),
initialDiscoveryFinished,
unassignedSplits);
}

private Tuple2<Set<TableBucket>, Map<Long, String>> deserializeAssignBucketAndPartitions(
DataInputDeserializer in) throws IOException {
// deserialize assigned buckets
Expand Down Expand Up @@ -302,4 +371,37 @@ private LeaseContext deserializeLeaseId(final DataInputDeserializer in) throws I
return new LeaseContext(
kvSnapshotLeaseId, LeaseContext.DEFAULT.getKvSnapshotLeaseDurationMs());
}

private void serializeUnassignedSplits(
final DataOutputSerializer out, Collection<SourceSplitBase> unassignedSplits)
throws IOException {
out.writeInt(unassignedSplits.size());
if (!unassignedSplits.isEmpty()) {
SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource);
out.writeInt(sourceSplitSerializer.getVersion());
for (SourceSplitBase split : unassignedSplits) {
byte[] serializeBytes = sourceSplitSerializer.serialize(split);
out.writeInt(serializeBytes.length);
out.write(serializeBytes);
}
}
}

private List<SourceSplitBase> deserializeUnassignedSplits(final DataInputDeserializer in)
throws IOException {
int numSplits = in.readInt();
if (numSplits == 0) {
return new ArrayList<>();
}
SourceSplitSerializer sourceSplitSerializer = new SourceSplitSerializer(lakeSource);
int version = in.readInt();
List<SourceSplitBase> splits = new ArrayList<>(numSplits);
for (int i = 0; i < numSplits; i++) {
int splitSizeInBytes = in.readInt();
byte[] splitBytes = new byte[splitSizeInBytes];
in.readFully(splitBytes);
splits.add(sourceSplitSerializer.deserialize(version, splitBytes));
}
return splits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -44,15 +46,62 @@ public class SourceEnumeratorState {
// lease context for restore.
private final String leaseId;

/**
* Whether the initial partition discovery has been completed. Following FLIP-288, partitions
* discovered after the initial startup always use earliest offsets to prevent data loss.
*/
private final boolean initialDiscoveryFinished;

/**
* Splits that have been initialized (offsets resolved) but not yet assigned to readers.
* Following Kafka's FLIP-288 pattern, these are persisted in checkpoint state so that on
* restore they can be directly assigned without re-initialization, preserving the original
* offset strategy.
*/
private final Collection<SourceSplitBase> unassignedSplits;

public SourceEnumeratorState(
Set<TableBucket> assignedBuckets,
Map<Long, String> assignedPartitions,
@Nullable List<SourceSplitBase> remainingHybridLakeFlussSplits,
String leaseId) {
this(
assignedBuckets,
assignedPartitions,
remainingHybridLakeFlussSplits,
leaseId,
false,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default is used when restoring old V2 enumerator state, but it leaves initialDiscoveryFinished=false. After an upgrade, partitions created after that old checkpoint can be classified as initial partitions and use the user startup mode (for example latest), which reintroduces the data-loss case this patch is trying to avoid. Could we default old V2 state to true here, or otherwise make the migration preserve post-initial discovery semantics?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If false is changed to true, the initial split—which is not yet fully assigned—will be treated as "earliest", violating the intended semantics. In extreme cases, a checkpoint might be taken before any splits are distributed at all, causing the entire job to restart from the earliest offset upon recovery.

Setting it to false ensures semantic consistency: although there may be a brief period at startup during which data is read from the earliest offset, this behavior aligns with the original design, and normal processing resumes shortly afterward.

Collections.emptyList());
}

public SourceEnumeratorState(
Set<TableBucket> assignedBuckets,
Map<Long, String> assignedPartitions,
@Nullable List<SourceSplitBase> remainingHybridLakeFlussSplits,
String leaseId,
boolean initialDiscoveryFinished) {
this(
assignedBuckets,
assignedPartitions,
remainingHybridLakeFlussSplits,
leaseId,
initialDiscoveryFinished,
Collections.emptyList());
}

public SourceEnumeratorState(
Set<TableBucket> assignedBuckets,
Map<Long, String> assignedPartitions,
@Nullable List<SourceSplitBase> remainingHybridLakeFlussSplits,
String leaseId,
boolean initialDiscoveryFinished,
Collection<SourceSplitBase> unassignedSplits) {
this.assignedBuckets = assignedBuckets;
this.assignedPartitions = assignedPartitions;
this.remainingHybridLakeFlussSplits = remainingHybridLakeFlussSplits;
this.leaseId = leaseId;
this.initialDiscoveryFinished = initialDiscoveryFinished;
this.unassignedSplits = unassignedSplits;
}

public Set<TableBucket> getAssignedBuckets() {
Expand All @@ -72,6 +121,14 @@ public String getLeaseId() {
return leaseId;
}

public boolean isInitialDiscoveryFinished() {
return initialDiscoveryFinished;
}

public Collection<SourceSplitBase> getUnassignedSplits() {
return unassignedSplits;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -85,12 +142,19 @@ public boolean equals(Object o) {
&& Objects.equals(assignedPartitions, that.assignedPartitions)
&& Objects.equals(
remainingHybridLakeFlussSplits, that.remainingHybridLakeFlussSplits)
&& Objects.equals(leaseId, that.leaseId);
&& Objects.equals(leaseId, that.leaseId)
&& initialDiscoveryFinished == that.initialDiscoveryFinished
&& Objects.equals(unassignedSplits, that.unassignedSplits);
}

@Override
public int hashCode() {
return Objects.hash(assignedBuckets, assignedPartitions, remainingHybridLakeFlussSplits);
return Objects.hash(
assignedBuckets,
assignedPartitions,
remainingHybridLakeFlussSplits,
initialDiscoveryFinished,
unassignedSplits);
}

@Override
Expand All @@ -104,6 +168,10 @@ public String toString() {
+ remainingHybridLakeFlussSplits
+ ", leaseId="
+ leaseId
+ ", initialDiscoveryFinished="
+ initialDiscoveryFinished
+ ", unassignedSplits="
+ unassignedSplits
+ '}';
}
}
Loading