Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

## Breaking Changes

* Python SDK now does not allow mixing the yield and return statements in `DoFn.process()` ([#22969](https://github.com/apache/beam/issues/22969)). `yield` is recommended for emitting elements and `yield from` for iterators.
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

## Deprecations
Expand Down
48 changes: 48 additions & 0 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import traceback
import types
import typing
from itertools import dropwhile

from apache_beam import coders
from apache_beam import pvalue
Expand Down Expand Up @@ -1387,6 +1388,47 @@ def partition_for(self, element, num_partitions, *args, **kwargs):
return self._fn(element, num_partitions, *args, **kwargs)


def _get_function_body_without_inners(func):
source_lines = inspect.getsourcelines(func)[0]
source_lines = dropwhile(lambda x: x.startswith("@"), source_lines)
def_line = next(source_lines).strip()
if def_line.startswith("def ") and def_line.endswith(":"):
first_line = next(source_lines)
indentation = len(first_line) - len(first_line.lstrip())
final_lines = [first_line[indentation:]]

skip_inner_def = False
if first_line[indentation:].startswith("def "):
skip_inner_def = True
for line in source_lines:
line_indentation = len(line) - len(line.lstrip())

if line[indentation:].startswith("def "):
skip_inner_def = True
continue

if skip_inner_def and line_indentation == indentation:
skip_inner_def = False

if skip_inner_def and line_indentation > indentation:
continue
final_lines.append(line[indentation:])

return "".join(final_lines)
else:
return def_line.rsplit(":")[-1].strip()


def _check_fn_use_yield_and_return(fn):
if isinstance(fn, types.BuiltinFunctionType):
return False
try:
source_code = _get_function_body_without_inners(fn)
return "yield " in source_code and "return " in source_code
except TypeError:
return False


class ParDo(PTransformWithSideInputs):
"""A :class:`ParDo` transform.

Expand Down Expand Up @@ -1427,6 +1469,12 @@ def __init__(self, fn, *args, **kwargs):
if not isinstance(self.fn, DoFn):
raise TypeError('ParDo must be called with a DoFn instance.')

# DoFn.process cannot allow both return and yield
if _check_fn_use_yield_and_return(self.fn.process):
raise RuntimeError(
'The yield and return statements in the process method '
f'of {self.fn.__class__ } can not be mixed.')

# Validate the DoFn by creating a DoFnSignature
from apache_beam.runners.common import DoFnSignature
self._signature = DoFnSignature(self.fn)
Expand Down
63 changes: 63 additions & 0 deletions sdks/python/apache_beam/transforms/core_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Unit tests for the core python file."""
# pytype: skip-file

import logging
import unittest

import apache_beam as beam


class TestDoFn1(beam.DoFn):
def process(self, element):
yield element


class TestDoFn2(beam.DoFn):
def process(self, element):
def inner_func(x):
yield x

return inner_func(element)


class TestDoFn3(beam.DoFn):
"""mixing return and yield is not allowed"""
def process(self, element):
if not element:
return -1
yield element


class CreateTest(unittest.TestCase):
def test_dofn_with_yield_and_return(self):
assert beam.ParDo(sum)
assert beam.ParDo(TestDoFn1())
assert beam.ParDo(TestDoFn2())
with self.assertRaises(RuntimeError) as e:
beam.ParDo(TestDoFn3())
self.assertEqual(
str(e.exception),
'The yield and return statements in the process method '
f'of {TestDoFn3().__class__} can not be mixed.')


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()