Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* used as the end of the range to indicate infinity.
*
* <p>An offset range is considered growable when the end offset could grow (or change) during
* execution time (e.g., Kafka topic partition offset, appended file, ...).
* execution time (e.g., appended file, ...).
*
* <p>The growable range is marked as done by claiming {@code Long.MAX_VALUE}.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms.splittabledofn;

/**
* A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with unsplittable
* restrictions.
*
* <p>A restriction is considered unsplittable when restrictions of an element must not be processed
* simultaneously (e.g., Kafka topic partition).
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class UnsplittableRestrictionTracker<RestrictionT, PositionT>
extends RestrictionTracker<RestrictionT, PositionT> implements RestrictionTracker.HasProgress {
private final RestrictionTracker<RestrictionT, PositionT> tracker;

public UnsplittableRestrictionTracker(RestrictionTracker<RestrictionT, PositionT> tracker) {
this.tracker = tracker;
}

@Override
public boolean tryClaim(PositionT position) {
return tracker.tryClaim(position);
}

@Override
public RestrictionT currentRestriction() {
return tracker.currentRestriction();
}

@Override
public SplitResult<RestrictionT> trySplit(double fractionOfRemainder) {
return fractionOfRemainder < 1.0 ? null : tracker.trySplit(fractionOfRemainder);
}

@Override
public void checkDone() throws IllegalStateException {
tracker.checkDone();
}

@Override
public IsBounded isBounded() {
return tracker.isBounded();
}

@Override
public Progress getProgress() {
return tracker instanceof RestrictionTracker.HasProgress
? ((RestrictionTracker.HasProgress) tracker).getProgress()
: Progress.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.UnsplittableRestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -108,6 +109,12 @@
*
* <h4>Splitting</h4>
*
* <p>Consumer groups must not consume from the same {@link TopicPartition} simultaneously. Doing so
* may arbitrarily overwrite a consumer group's committed offset for a {@link TopicPartition}.
* Restriction trackers for a {@link KafkaSourceDescriptor} are wrapped as {@link
* UnsplittableRestrictionTracker<OffsetRange, Long>} and will only return a non-null {@link
* SplitResult} for a checkpoint.
*
* <p>TODO(https://github.com/apache/beam/issues/20280): Add support for initial splitting.
*
* <h4>Checkpoint and Resume Processing</h4>
Expand Down Expand Up @@ -488,20 +495,21 @@ public double getSize(

@NewTracker
@RequiresNonNull({"latestOffsetEstimatorCache"})
public OffsetRangeTracker restrictionTracker(
public UnsplittableRestrictionTracker<OffsetRange, Long> restrictionTracker(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) {
final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>
latestOffsetEstimatorCache = this.latestOffsetEstimatorCache;

if (restriction.getTo() < Long.MAX_VALUE) {
return new OffsetRangeTracker(restriction);
return new UnsplittableRestrictionTracker<>(new OffsetRangeTracker(restriction));
}

// OffsetEstimators are cached for each topic-partition because they hold a stateful connection,
// so we want to minimize the amount of connections that we start and track with Kafka. Another
// point is that it has a memoized backlog, and this should make that more reusable estimations.
return new GrowableOffsetRangeTracker(
restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor));
return new UnsplittableRestrictionTracker<>(
new GrowableOffsetRangeTracker(
restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor)));
}

@ProcessElement
Expand Down
Loading