Skip to content
Merged
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1387,6 +1387,11 @@ def partition_for(self, element, num_partitions, *args, **kwargs):
return self._fn(element, num_partitions, *args, **kwargs)


def _check_fn_use_yield_and_return(fn):
source_code = inspect.getsource(fn)
return " yield " in source_code and " return " in source_code


class ParDo(PTransformWithSideInputs):
"""A :class:`ParDo` transform.

Expand Down Expand Up @@ -1427,6 +1432,12 @@ def __init__(self, fn, *args, **kwargs):
if not isinstance(self.fn, DoFn):
raise TypeError('ParDo must be called with a DoFn instance.')

# DoFn.process cannot allow both return and yield
if _check_fn_use_yield_and_return(self.fn.process):
raise RuntimeError(
'The yield and return statements in the process method '
f'of {self.fn.__class__ } can not be mixed.')

# Validate the DoFn by creating a DoFnSignature
from apache_beam.runners.common import DoFnSignature
self._signature = DoFnSignature(self.fn)
Expand Down
59 changes: 59 additions & 0 deletions sdks/python/apache_beam/transforms/core_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Unit tests for the core python file."""
# pytype: skip-file

import logging
import unittest

import apache_beam as beam


class TestDoFn1(beam.DoFn):
def process(self, element):
yield element


class TestDoFn2(beam.DoFn):
def process(self, element):
return element


class TestDoFn3(beam.DoFn):
"""mixing return and yield is not allowed"""
def process(self, element):
if not element:
return -1
yield element


class CreateTest(unittest.TestCase):
def test_dofn_with_yield_and_return(self):
assert beam.ParDo(TestDoFn1())
assert beam.ParDo(TestDoFn2())
with self.assertRaises(RuntimeError) as e:
beam.ParDo(TestDoFn3())
self.assertEqual(
str(e.exception),
'The yield and return statements in the process method '
f'of {TestDoFn3().__class__} can not be mixed.')


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()