[flink]Newly discovered partition read from earliest rather than scan.startup.mode#3548
[flink]Newly discovered partition read from earliest rather than scan.startup.mode#3548loserwang1024 wants to merge 1 commit into
Conversation
|
|
||
| initialDiscoveryFinished = true; | ||
| for (SourceSplitBase split : splits) { | ||
| unassignedSplits.put(split.getTableBucket(), split); |
There was a problem hiding this comment.
This stores unassigned splits by TableBucket, but TableBucket is not unique for all split types. LakeSnapshotSplit uses splitIndex to distinguish multiple lake splits in the same bucket, so checkpointing only unassignedSplits.values() can drop all but one split (or fail on restore with duplicate keys). Could we persist these by splitId or as a list instead of de-duplicating by TableBucket?
| assignedPartitions, | ||
| remainingHybridLakeFlussSplits, | ||
| leaseId, | ||
| false, |
There was a problem hiding this comment.
This default is used when restoring old V2 enumerator state, but it leaves initialDiscoveryFinished=false. After an upgrade, partitions created after that old checkpoint can be classified as initial partitions and use the user startup mode (for example latest), which reintroduces the data-loss case this patch is trying to avoid. Could we default old V2 state to true here, or otherwise make the migration preserve post-initial discovery semantics?
Purpose
Linked issue: close #3543
Brief change log
Tests
API and Format
Documentation