Skip to content

Conversation

@dnamaz
Copy link

@dnamaz dnamaz commented Dec 6, 2025

Pull Request: Add PrimitiveUnboundedRead/PrimitiveBoundedRead Translators to Flink Runner

Title

[Flink Runner] Add translators for PrimitiveUnboundedRead and PrimitiveBoundedRead

Description

What is this PR doing?

This PR adds explicit translators for SplittableParDo.PrimitiveUnboundedRead and SplittableParDo.PrimitiveBoundedRead to the Flink streaming transform translators. These translators handle the case where Read.Unbounded and Read.Bounded are converted to primitive reads by SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary().

Why is this needed?

The Flink classic runner (non-portable) calls convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary() when NOT using the beam_fn_api experiment. This converts SDF-wrapped reads to PrimitiveUnboundedRead and PrimitiveBoundedRead transforms. However, there were no registered translators for these transforms, causing pipelines using unbounded sources (like KinesisIO.read()) to fail with:

java.lang.IllegalStateException: No translator known for org.apache.beam.sdk.util.construction.SplittableParDo$PrimitiveUnboundedRead

How does it work?

The new translators:

  1. PrimitiveUnboundedReadTranslator: Extracts the UnboundedSource from PrimitiveUnboundedRead.getSource() and creates a FlinkUnboundedSource, following the same pattern as the existing UnboundedReadSourceTranslator.

  2. PrimitiveBoundedReadTranslator: Extracts the BoundedSource from PrimitiveBoundedRead.getSource() and creates a FlinkBoundedSource, following the same pattern as the existing BoundedReadSourceTranslator.

The key difference from the existing translators is that they retrieve the source directly from the transform (transform.getSource()) rather than using ReadTranslation.unboundedSourceFromTransform(), since PrimitiveUnboundedRead and PrimitiveBoundedRead are not standard Read transforms with URNs.

Changes

  • FlinkStreamingTransformTranslators.java:

    • Added PrimitiveUnboundedReadTranslator<T> class
    • Added PrimitiveBoundedReadTranslator<T> class
    • Modified getTranslator() to check for PrimitiveUnboundedRead and PrimitiveBoundedRead instances before URN lookup
  • FlinkStreamingTransformTranslatorsTest.java:

    • Added getTranslatorReturnsPrimitiveUnboundedReadTranslator() test
    • Added getTranslatorReturnsPrimitiveBoundedReadTranslator() test
    • Added primitiveUnboundedReadTranslatorProducesCorrectSource() test
    • Added primitiveBoundedReadTranslatorProducesCorrectSource() test
  • CHANGES.md:

    • Added bugfix entry for 2.71.0

Issue

Fixes #37035

Created issue: #37035
Related to #20530 (Use SDF read as default)

Checklist

  • Code formatted with ./gradlew :runners:flink:1.18:spotlessApply
  • Unit tests added in FlinkStreamingTransformTranslatorsTest.java
  • All Flink runner tests pass (./gradlew :runners:flink:1.18:test)
  • CHANGES.md updated and formatted with ./gradlew formatChanges
  • No breaking changes to public API

Testing

Unit Tests

./gradlew :runners:flink:1.18:test
# BUILD SUCCESSFUL - all tests pass

Integration Testing

Tested on AWS Managed Apache Flink with a real pipeline using KinesisIO.read():

  1. Before fix: Pipeline fails during translation with "No translator known for PrimitiveUnboundedRead"
  2. After fix: Pipeline successfully translates and runs, reading records from Kinesis

Test environment:

  • AWS Managed Apache Flink (FLINK-1_18 runtime)
  • Apache Beam 2.71.0-SNAPSHOT (with this fix)
  • KinesisIO.read() connector

Backwards Compatibility

This change is fully backwards compatible:

  1. No public API changes: Only internal translator classes are added
  2. No behavior changes for existing code: The new translators only activate when PrimitiveUnboundedRead or PrimitiveBoundedRead transforms are present (which previously caused failures)
  3. Existing URN-based translation unchanged: Standard Read.Bounded and Read.Unbounded with URNs continue to use the existing ReadSourceTranslator

Performance

No performance impact expected. The new translators use the same FlinkUnboundedSource and FlinkBoundedSource implementations as the existing translators.

…ink streaming runner

This PR adds explicit translators for SplittableParDo.PrimitiveUnboundedRead
and SplittableParDo.PrimitiveBoundedRead to the Flink streaming runner,
enabling unbounded sources like KinesisIO.read() to work with the classic
Flink runner on AWS Managed Flink.

Problem:
- Read.Unbounded.expand() wraps sources with UnboundedSourceAsSDFWrapperFn
- FlinkRunner converts SDF-based reads to PrimitiveUnboundedRead
- FlinkStreamingTransformTranslators.getTranslator() had no translator for
  PrimitiveUnboundedRead, causing 'No translator known' errors

Solution:
- Add PrimitiveUnboundedReadTranslator that extracts UnboundedSource directly
  from transform.getSource() instead of using ReadTranslation
- Add PrimitiveBoundedReadTranslator for bounded sources
- Modify getTranslator() to check for these transform types explicitly

Testing:
- Added 4 new unit tests
- All 383 Flink runner tests pass
…veBoundedRead

This commit adds explicit translators for SplittableParDo.PrimitiveUnboundedRead
and SplittableParDo.PrimitiveBoundedRead to the Flink streaming transform
translators.

The Flink classic runner calls convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()
when NOT using beam_fn_api experiment, which converts SDF-wrapped reads to
PrimitiveUnboundedRead and PrimitiveBoundedRead. Without translators for these
transforms, pipelines using unbounded sources like KinesisIO.read() fail with:
'No translator known for PrimitiveUnboundedRead'

Changes:
- Add PrimitiveUnboundedReadTranslator class
- Add PrimitiveBoundedReadTranslator class
- Modify getTranslator() to handle these transforms before URN lookup
- Add unit tests for the new translators
- Update CHANGES.md with bugfix entry

Related to apache#20530
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @dnamaz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical issue in the Flink classic runner where pipelines using certain read operations, specifically those internally converted to "PrimitiveUnboundedRead" or "PrimitiveBoundedRead" by "SplittableParDo", would fail during translation due to the absence of appropriate translators. By implementing dedicated translators for these primitive read types, the change ensures that such pipelines, including those leveraging unbounded sources like KinesisIO, can now execute successfully, thereby enhancing the robustness and compatibility of the Flink runner.

Highlights

  • Flink Runner Bug Fix: Addresses an "IllegalStateException" in the Flink classic runner when using unbounded sources (e.g., KinesisIO.read()) without the "beam_fn_api" experiment, caused by missing translators for "PrimitiveUnboundedRead" and "PrimitiveBoundedRead" transforms.
  • New Translators Added: Introduces "PrimitiveUnboundedReadTranslator" and "PrimitiveBoundedReadTranslator" to handle the conversion of "Read.Unbounded" and "Read.Bounded" into their primitive forms by "SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()".
  • Translation Mechanism: The new translators directly extract the "UnboundedSource" or "BoundedSource" from the respective primitive read transform, bypassing the URN-based translation used for standard "Read" transforms.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

github-actions bot commented Dec 7, 2025

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@dnamaz
Copy link
Author

dnamaz commented Dec 8, 2025

assign set of reviewers

There is a problem with one of the tests, it is happening in this file:

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java

on line 302,

public static <T> Block<T> fromValues(List<T> values, @Nullable ByteString nextToken) { if (values.isEmpty() && nextToken == null) { return emptyBlock(); } ImmutableList<T> immutableValues = ImmutableList.copyOf(values); long listWeight = immutableValues.size() * Caches.REFERENCE_SIZE; for (T value : immutableValues) { listWeight = LongMath.saturatedAdd(listWeight, Caches.weigh(value)); } return fromValues(immutableValues, listWeight, nextToken); }

will throw a null pointer exception if the value is null. I believe this is causing the issue with the PR check failing in the test.

testSideInputWithNullDefault,

should this be fixed and expected in the test or modify the collection to use a null safe list? should it be part of this PR or create another bug? Does this currently happen in other PR checks? Am i missing something?

The call chain is:
ViewTest.processElement (line 824) calls c.sideInput(view)
→ FnApiDoFnRunner.sideInput()
→ FnApiStateAccessor.get()
→ SingletonViewFn2.apply() iterates through the side input
→ CachingStateIterator.hasNext() loads the next block
→ Block.fromValues() at line 315 - calls ImmutableList.copyOf() which fails on null

@dnamaz
Copy link
Author

dnamaz commented Dec 8, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly adds translators for PrimitiveUnboundedRead and PrimitiveBoundedRead to the Flink runner, fixing a crash when using unbounded sources without the beam_fn_api experiment. The changes are well-structured and include thorough tests.

My main feedback is regarding code duplication in the new translator classes. Both PrimitiveUnboundedReadTranslator and PrimitiveBoundedReadTranslator share a significant amount of code with their existing counterparts (UnboundedReadSourceTranslator and BoundedReadSourceTranslator). I've suggested refactoring these to use common helper methods to improve maintainability.

Additionally, PrimitiveBoundedReadTranslator appears to be missing some important logic from BoundedReadSourceTranslator for error handling and batch execution, which could lead to issues. I've provided a code suggestion to address this.

Finally, there's a minor update needed in CHANGES.md to replace the placeholder issue number.

Overall, this is a great fix, and with the suggested improvements, the code will be more robust and maintainable.

dnamaz and others added 2 commits December 8, 2025 16:55
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
- Add translateUnboundedSource() helper method for common unbounded source translation logic
- Add translateBoundedSource() helper method for common bounded source translation logic
- Simplify UnboundedReadSourceTranslator to use helper method
- Simplify PrimitiveUnboundedReadTranslator to use helper method
- Simplify BoundedReadSourceTranslator to use helper method
- Simplify PrimitiveBoundedReadTranslator to use helper method
- Add missing try-catch, .returns(), and batch mode slot sharing to PrimitiveBoundedReadTranslator

This refactoring reduces ~100 lines of duplicated code and ensures
consistent behavior across all read translators.
@dnamaz
Copy link
Author

dnamaz commented Dec 9, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly addresses a failure in the Flink classic runner by adding translators for PrimitiveUnboundedRead and PrimitiveBoundedRead. The changes are well-structured, including a refactoring that extracts common source translation logic into helper methods, which improves code clarity and reduces duplication. The new translators correctly handle the primitive read transforms that are generated when SplittableParDo converts SDF-wrapped reads. The accompanying tests are thorough and validate the new functionality effectively. I have one minor suggestion to further reduce code duplication.

Extract the duplicated parallelism calculation logic from translateUnboundedSource
and translateBoundedSource into a shared private helper method.
@dnamaz
Copy link
Author

dnamaz commented Dec 9, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses a critical bug in the Flink classic runner where pipelines with unbounded sources would fail due to missing translators for PrimitiveUnboundedRead and PrimitiveBoundedRead. The solution of adding explicit translators for these types is direct and correct.

I appreciate the refactoring you've done to extract common logic for bounded and unbounded source translation into translateBoundedSource and translateUnboundedSource helper methods. This significantly improves code clarity and maintainability by reducing duplication. The accompanying unit tests are comprehensive, verifying that the correct translators are selected and that they produce the expected Flink source transformations.

I have one minor suggestion to further reduce code duplication. Overall, this is a high-quality contribution that resolves an important issue.

Comment on lines +212 to +214
PCollection<T> output =
(PCollection<T>)
Iterables.getOnlyElement(context.getCurrentTransform().getOutputs().values());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic to retrieve the single output PCollection is duplicated in translateBoundedSource (line 273). To improve maintainability and reduce code duplication, consider extracting this into a private static helper method within this class.

For example:

@SuppressWarnings("unchecked")
private static <T> PCollection<T> getOnlyOutput(FlinkStreamingTranslationContext context) {
  return (PCollection<T>)
      Iterables.getOnlyElement(context.getCurrentTransform().getOutputs().values());
}

You could then call getOnlyOutput(context) in both translateUnboundedSource and translateBoundedSource.

Copy link
Author

@dnamaz dnamaz Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this, but felt that extracting a 2-3 line helper for code that's only duplicated in two places within the same file would be over-abstraction. The Iterables.getOnlyElement() call is already self-documenting about the expected behavior.
@gemini-code-assist

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/gemini review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You make a valid point. While the DRY principle is generally good for maintainability, it's also important to balance it with readability and avoiding unnecessary abstraction, especially for very short and self-documenting code snippets like Iterables.getOnlyElement(). Given that the duplication is limited to two places within the same file, your decision to keep it as is, prioritizing immediate readability and avoiding potential over-abstraction, is perfectly reasonable. Thank you for the thoughtful explanation!

@dnamaz
Copy link
Author

dnamaz commented Dec 9, 2025

/gemini summary

@gemini-code-assist
Copy link
Contributor

Summary of Changes

This pull request addresses a critical bug in the Flink classic runner where pipelines utilizing certain read operations, specifically those internally converted to PrimitiveUnboundedRead or PrimitiveBoundedRead by SplittableParDo, would fail during translation due to the absence of appropriate translators. By introducing dedicated translators and refactoring common source translation logic, this change ensures that such pipelines, including those using unbounded sources like KinesisIO, can now execute successfully, significantly improving the robustness and compatibility of the Flink runner.

Highlights

  • Fix for Flink Classic Runner: Resolves an IllegalStateException in the Flink classic runner when PrimitiveUnboundedRead or PrimitiveBoundedRead transforms are encountered, particularly with unbounded sources like KinesisIO, due to missing translators.
  • New Translators Introduced: Adds PrimitiveUnboundedReadTranslator and PrimitiveBoundedReadTranslator to explicitly handle primitive read transforms generated by SplittableParDo's conversion logic.
  • Refactored Source Translation: Common logic for translating unbounded and bounded sources has been extracted into dedicated helper methods (translateUnboundedSource, translateBoundedSource) to reduce code duplication and improve maintainability.
  • Enhanced Test Coverage: New unit tests have been added to verify the correct functioning and source production of the newly introduced primitive read translators.
Changelog
  • CHANGES.md
    • Added a bugfix entry for Flink classic runner failing with "No translator known for PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO after SDF-to-primitive-read conversion.
  • runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
    • Introduced PrimitiveUnboundedReadTranslator<T> and PrimitiveBoundedReadTranslator<T> classes to handle primitive read transforms.
    • Modified the getTranslator() method to prioritize lookup for PrimitiveUnboundedRead and PrimitiveBoundedRead instances.
    • Extracted common source translation logic into new private static helper methods: translateUnboundedSource and translateBoundedSource.
    • Extracted a private static helper method getSourceParallelism to determine source operator parallelism.
    • Refactored UnboundedReadSourceTranslator and BoundedReadSourceTranslator to utilize the new common translation helper methods.
  • runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java
    • Added new unit tests: getTranslatorReturnsPrimitiveUnboundedReadTranslator, getTranslatorReturnsPrimitiveBoundedReadTranslator, primitiveUnboundedReadTranslatorProducesCorrectSource, and primitiveBoundedReadTranslatorProducesCorrectSource.
    • Introduced a new helper method applyReadSourceTransformWithTranslator for testing purposes.
Activity
  • A bot provided an initial summary of the pull request.
  • The bot noted that checks were failing and would not request a review.
  • The author identified a potential NullPointerException in StateFetchingIterators.java affecting a test (testSideInputWithNullDefault) and sought guidance on whether to fix it in this PR or a new one.
  • The author issued multiple /gemini review commands and a /gemini summary command.
  • A bot review highlighted that the PrimitiveBoundedReadTranslator was missing important logic (try-catch, .returns(), batch mode handling, consistent stepName) present in BoundedReadSourceTranslator.
  • A bot review suggested replacing the XXXXX placeholder in CHANGES.md with the actual issue number 37035.
  • A bot review recommended extracting common logic from UnboundedReadSourceTranslator and PrimitiveUnboundedReadTranslator into a shared helper method.
  • A bot review suggested extracting the source parallelism determination logic into a private helper method to reduce duplication.
  • A bot review suggested extracting the logic for retrieving the single output PCollection into a helper method, to which the author responded, arguing against over-abstraction for a 2-3 line duplication, and the bot acknowledged the valid point.

@dnamaz
Copy link
Author

dnamaz commented Dec 9, 2025

assign set of reviewers

@github-actions
Copy link
Contributor

github-actions bot commented Dec 9, 2025

Assigning reviewers:

R: @shunping added as fallback since no labels match configuration

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @shunping

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @shunping added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @shunping

@tarun-google
Copy link
Contributor

R: @kennknowles @stankiewicz

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]:No translator known for PrimitiveUnboundedRead after SDF fallback

2 participants