Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@

## Bugfixes

* Fixed Flink classic runner failing with "No translator known for PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO after SDF-to-primitive-read conversion (Java) ([#XXXXX](https://github.com/apache/beam/issues/XXXXX)).
* Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)).
Logical type and coder registry are saved for pipelines in the case of default pickler. This fixes a side effect of switching to cloudpickle as default pickler in Beam 2.65.0 (Python) ([#35738](https://github.com/apache/beam/issues/35738)).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ class FlinkStreamingTransformTranslators {

public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
PTransform<?, ?> transform) {
// Handle PrimitiveUnboundedRead explicitly (created by SplittableParDo conversion)
if (transform instanceof SplittableParDo.PrimitiveUnboundedRead) {
return new PrimitiveUnboundedReadTranslator<>();
}
// Handle PrimitiveBoundedRead explicitly
if (transform instanceof SplittableParDo.PrimitiveBoundedRead) {
return new PrimitiveBoundedReadTranslator<>();
}
@Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
return urn == null ? null : TRANSLATORS.get(urn);
}
Expand Down Expand Up @@ -263,6 +271,129 @@ public void translateNode(
}
}

/**
* Translator for {@link SplittableParDo.PrimitiveUnboundedRead}.
*
* <p>This handles the case where Read.Unbounded is converted to PrimitiveUnboundedRead by {@link
* SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}.
*/
private static class PrimitiveUnboundedReadTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
SplittableParDo.PrimitiveUnboundedRead<T>> {

@Override
public void translateNode(
SplittableParDo.PrimitiveUnboundedRead<T> transform,
FlinkStreamingTranslationContext context) {

PCollection<T> output = context.getOutput(transform);

DataStream<WindowedValue<T>> source;
DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
TypeInformation<WindowedValue<T>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));

Coder<T> coder = context.getOutput(transform).getCoder();

TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
new CoderTypeInformation<>(
WindowedValues.getFullCoder(
ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
output.getWindowingStrategy().getWindowFn().windowCoder()),
context.getPipelineOptions());

// Get source directly from PrimitiveUnboundedRead (not via ReadTranslation)
UnboundedSource<T, ?> rawSource = transform.getSource();

String fullName = getCurrentTransformName(context);
try {
int parallelism =
context.getExecutionEnvironment().getMaxParallelism() > 0
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();

FlinkUnboundedSource<T> unboundedSource =
FlinkSource.unbounded(
transform.getName(),
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);
nonDedupSource =
context
.getExecutionEnvironment()
.fromSource(
unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo)
.uid(fullName);

if (rawSource.requiresDeduping()) {
source =
nonDedupSource
.keyBy(new ValueWithRecordIdKeySelector<>())
.transform(
"deduping",
outputTypeInfo,
new DedupingOperator<>(context.getPipelineOptions()))
.uid(format("%s/__deduplicated__", fullName));
} else {
source =
nonDedupSource
.flatMap(new StripIdsMap<>(context.getPipelineOptions()))
.returns(outputTypeInfo);
}
} catch (Exception e) {
throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e);
}

context.setOutputDataStream(output, source);
}
}

/**
* Translator for {@link SplittableParDo.PrimitiveBoundedRead}.
*
* <p>This handles the case where Read.Bounded is converted to PrimitiveBoundedRead by {@link
* SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}.
*/
private static class PrimitiveBoundedReadTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
SplittableParDo.PrimitiveBoundedRead<T>> {

@Override
public void translateNode(
SplittableParDo.PrimitiveBoundedRead<T> transform,
FlinkStreamingTranslationContext context) {

PCollection<T> output = context.getOutput(transform);
TypeInformation<WindowedValue<T>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));

// Get source directly from PrimitiveBoundedRead (not via ReadTranslation)
BoundedSource<T> rawSource = transform.getSource();

String fullName = getCurrentTransformName(context);
int parallelism =
context.getExecutionEnvironment().getMaxParallelism() > 0
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();

FlinkBoundedSource<T> flinkBoundedSource =
FlinkSource.bounded(
fullName,
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);

DataStream<WindowedValue<T>> source =
context
.getExecutionEnvironment()
.fromSource(
flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo)
.uid(fullName);

context.setOutputDataStream(output, source);
}
}

static class ValueWithRecordIdKeySelector<T>
implements KeySelector<WindowedValue<ValueWithRecordId<T>>, FlinkKey>,
ResultTypeQueryable<FlinkKey> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.beam.runners.flink;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -147,6 +149,88 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() {
assertEquals(parallelism, source.getNumSplits());
}

@Test
public void getTranslatorReturnsPrimitiveUnboundedReadTranslator() {
SplittableParDo.PrimitiveUnboundedRead<String> transform =
new SplittableParDo.PrimitiveUnboundedRead<>(Read.from(new TestUnboundedSource()));

FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> translator =
FlinkStreamingTransformTranslators.getTranslator(transform);

assertNotNull("Translator should not be null for PrimitiveUnboundedRead", translator);
}

@Test
public void getTranslatorReturnsPrimitiveBoundedReadTranslator() {
SplittableParDo.PrimitiveBoundedRead<String> transform =
new SplittableParDo.PrimitiveBoundedRead<>(Read.from(new TestBoundedSource(100)));

FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> translator =
FlinkStreamingTransformTranslators.getTranslator(transform);

assertNotNull("Translator should not be null for PrimitiveBoundedRead", translator);
}

@Test
public void primitiveUnboundedReadTranslatorProducesCorrectSource() {
final int maxParallelism = 4;
final int parallelism = 2;

SplittableParDo.PrimitiveUnboundedRead<String> transform =
new SplittableParDo.PrimitiveUnboundedRead<>(Read.from(new TestUnboundedSource()));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.setMaxParallelism(maxParallelism);

// Use getTranslator directly to verify our new translator is used
FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> translator =
FlinkStreamingTransformTranslators.getTranslator(transform);
assertNotNull(translator);

Object sourceTransform =
applyReadSourceTransformWithTranslator(
transform, translator, PCollection.IsBounded.UNBOUNDED, env);

assertTrue(sourceTransform instanceof OneInputTransformation);
OneInputTransformation<?, ?> oneInputTransform = (OneInputTransformation<?, ?>) sourceTransform;

FlinkSource<?, ?> source =
(FlinkSource<?, ?>)
((SourceTransformation<?, ?, ?>)
Iterables.getOnlyElement(oneInputTransform.getInputs()))
.getSource();

assertEquals(maxParallelism, source.getNumSplits());
}

@Test
public void primitiveBoundedReadTranslatorProducesCorrectSource() {
final int maxParallelism = 4;
final int parallelism = 2;

SplittableParDo.PrimitiveBoundedRead<String> transform =
new SplittableParDo.PrimitiveBoundedRead<>(
Read.from(new TestBoundedSource(maxParallelism)));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.setMaxParallelism(maxParallelism);

// Use getTranslator directly to verify our new translator is used
FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> translator =
FlinkStreamingTransformTranslators.getTranslator(transform);
assertNotNull(translator);

Object sourceTransform =
applyReadSourceTransformWithTranslator(
transform, translator, PCollection.IsBounded.BOUNDED, env);

assertTrue(sourceTransform instanceof SourceTransformation);
FlinkBoundedSource<?> source =
(FlinkBoundedSource<?>) ((SourceTransformation<?, ?, ?>) sourceTransform).getSource();

assertEquals(maxParallelism, source.getNumSplits());
}

private Object applyReadSourceTransform(
PTransform<?, ?> transform, PCollection.IsBounded isBounded, StreamExecutionEnvironment env) {

Expand Down Expand Up @@ -178,6 +262,40 @@ private Object applyReadSourceTransform(
return ctx.getInputDataStream(pc).getTransformation();
}

@SuppressWarnings("unchecked")
private Object applyReadSourceTransformWithTranslator(
PTransform<?, ?> transform,
FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> translator,
PCollection.IsBounded isBounded,
StreamExecutionEnvironment env) {

FlinkStreamingTranslationContext ctx =
new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create(), true);

Pipeline pipeline = Pipeline.create();
PCollection<String> pc =
PCollection.createPrimitiveOutputInternal(
pipeline, WindowingStrategy.globalDefault(), isBounded, StringUtf8Coder.of());
pc.setName("output");

Map<TupleTag<?>, PValue> outputs = new HashMap<>();
outputs.put(new TupleTag<>(), pc);
AppliedPTransform<?, ?, ?> appliedTransform =
AppliedPTransform.of(
"test-transform",
Collections.emptyMap(),
PValues.fullyExpand(outputs),
transform,
ResourceHints.create(),
Pipeline.create());

ctx.setCurrentTransform(appliedTransform);
((FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<?, ?>>) translator)
.translateNode(transform, ctx);

return ctx.getInputDataStream(pc).getTransformation();
}

@SuppressWarnings("unchecked")
private FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<?, ?>>
getReadSourceTranslator() {
Expand Down
Loading