Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_transform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import glob
import logging
import os
import shutil
import tempfile
import unittest

Expand Down Expand Up @@ -903,6 +904,60 @@ def test_must_handle_error_output(self):
''',
providers=TEST_PROVIDERS)

def test_error_handling_log_combined_errors(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
result = p | YamlTransform(
'''
type: composite
transforms:
- type: Create
name: Input1
config:
elements: [1, 2, 0]
- type: Create
name: Input2
config:
elements: [3, 'a', 5]
- type: MapToFields
name: Inverse
input: Input1
config:
language: python
fields:
inverse: "1 / element"
error_handling:
output: errors
- type: MapToFields
name: Square
input: Input2
config:
language: python
fields:
square: "element * element"
error_handling:
output: errors
- type: LogForTesting
input:
- Inverse.errors
- Square.errors
- type: Flatten
name: GoodData
input:
- Inverse
- Square
output: GoodData
''',
providers=TEST_PROVIDERS)
assert_that(
result,
equal_to([
beam.Row(inverse=1.0, square=None),
beam.Row(inverse=0.5, square=None),
beam.Row(square=9, inverse=None),
beam.Row(square=25, inverse=None)
]))

def test_mapping_errors(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
Expand Down Expand Up @@ -1289,6 +1344,106 @@ def test_prefers_same_provider_class(self):
label='StartWith3')


class TestExternalYamlProvider(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.provider_path = os.path.join(self.temp_dir, 'power_provider.yaml')
with open(self.provider_path, 'w') as f:
f.write(
"""
- type: yaml
transforms:
RaiseElementToPower:
config_schema:
properties:
n: {type: integer}
body:
type: MapToFields
config:
language: python
append: true
fields:
power: "element ** {{n}}"
error_handling:
output: my_error
""")

def tearDown(self):
shutil.rmtree(self.temp_dir)

def test_provider_with_error_handling(self):
loaded_providers = yaml_provider.load_providers(self.provider_path)
test_providers = yaml_provider.InlineProvider(TEST_PROVIDERS)
merged_providers = yaml_provider.merge_providers(
loaded_providers, [test_providers])

with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
results = p | YamlTransform(
'''
type: composite
transforms:
- type: Create
config:
elements: [2, 'bad', 3]
- type: RaiseElementToPower
input: Create
config:
n: 2
- type: PyMap
name: TrimErrors
input: RaiseElementToPower.my_error
config:
fn: "lambda x: x.msg"
output:
good: RaiseElementToPower.good
bad: TrimErrors
''',
providers=merged_providers)

assert_that(
results['good'],
equal_to([beam.Row(element=2, power=4), beam.Row(element=3,
power=9)]),
label="CheckGood")
assert_that(
results['bad'],
equal_to([
'TypeError("unsupported operand type(s) for ** or pow(): ' +
'\'str\' and \'int\'")'
]),
label="CheckBad")

def test_must_consume_error_output(self):
# By adding a dummy error_handling block here, we signal to the static
# checker that this transform has an error output that must be consumed.
# The framework is able to handle the "nesting" where the provider for
# RaiseElementToPower also defines error handling internally.
loaded_providers = yaml_provider.load_providers(self.provider_path)
test_providers = yaml_provider.InlineProvider(TEST_PROVIDERS)
merged_providers = yaml_provider.merge_providers(
loaded_providers, [test_providers])

with self.assertRaisesRegex(Exception, 'Unconsumed error output.*'):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
_ = p | YamlTransform(
'''
type: composite
transforms:
- type: Create
config:
elements: [2, 'bad', 3]
- type: RaiseElementToPower
input: Create
config:
n: 2
error_handling:
output: my_error
''',
providers=merged_providers)


@beam.transforms.ptransform.annotate_yaml
class LinearTransform(beam.PTransform):
"""A transform used for testing annotate_yaml."""
Expand Down
58 changes: 58 additions & 0 deletions website/www/site/content/en/documentation/sdks/yaml-errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,62 @@ pipeline:
path: /path/to/errors.json
```

## Error Handling with Custom Providers
Custom transforms, such as those defined in separate YAML files via a `YamlProvider`, can also expose error outputs from their underlying transforms.

Consider a file `my_transforms.yaml` that defines a `RaiseElementToPower` transform:
```yaml
# my_transforms.yaml
- type: yaml
transforms:
RaiseElementToPower:
config_schema:
properties:
n: {type: integer}
body:
type: MapToFields
config:
language: python
append: true
fields:
power: "element ** {{n}}"
# This transform internally defines and exposes an error output.
error_handling:
output: my_error
```
This transform takes a numeric element and raises it to the power of `n`. If the element is not a number, it will produce an error. The error output from the internal `MapToFields` is named `my_error`. This error output is automatically exposed by the `RaiseElementToPower` transform.

When using this transform in a pipeline, you can access this error output and handle it. The main output of the transform will contain only the successfully processed elements.

```yaml
pipeline:
transforms:
- type: Create
config:
elements: [2, 'bad', 3]
- type: RaiseElementToPower
input: Create
config:
n: 2
- type: WriteToJson
name: WriteGood
# The main output contains successfully processed elements.
input: RaiseElementToPower
config:
path: /path/to/good
- type: WriteToJson
name: WriteBad
# The error output is accessed by its name.
input: RaiseElementToPower.my_error
config:
path: /path/to/bad

providers:
- include: my_transforms.yaml

```
In this example, the pipeline separates the good and bad records coming from the custom `RaiseElementToPower` transform. The good records are written to one location, and the error records are written to another.

A pipeline will fail at construction time if an error output is declared (either in a built-in transform or a custom one) but not consumed. This helps ensure that all error paths are considered.

See YAML schema [info](https://beam.apache.org/documentation/sdks/yaml-schema/) for another use of error_handling in a schema context.
Loading