Clear side input caches of dofninvoker when finishing bundle. Add microbenchmarks for map_fn with side inputs. #37123
191 fail, 1 336 skipped, 7 398 pass in 23m 31s
2 files 2 suites 23m 31s ⏱️
8 925 tests 7 398 ✅ 1 336 💤 191 ❌
8 948 runs 7 398 ✅ 1 359 💤 191 ❌
Results for commit fc29a2c.
Annotations
Check warning on line 0 in apache_beam.io.fileio_test.WriteFilesTest
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_streaming_complex_timing (apache_beam.io.fileio_test.WriteFilesTest) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 0s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.fileio_test.WriteFilesTest testMethod=test_streaming_complex_timing>
def test_streaming_complex_timing(self):
# Use state on the TestCase class, since other references would be pickled
# into a closure and not have the desired side effects.
#
# TODO(https://github.com/apache/beam/issues/18987): Use assert_that after
# it works for the cases here in streaming mode.
WriteFilesTest.all_records = []
dir = '%s%s' % (self._new_tempdir(), os.sep)
# Setting up the input (TestStream)
ts = TestStream().advance_watermark_to(0)
for elm in WriteFilesTest.LARGER_COLLECTION:
timestamp = int(elm)
ts.add_elements([('key', '%s' % elm)])
if timestamp % 5 == 0 and timestamp != 0:
# TODO(https://github.com/apache/beam/issues/18721): Add many firings
# per window after getting PaneInfo.
ts.advance_processing_time(5)
ts.advance_watermark_to(timestamp)
ts.advance_watermark_to_infinity()
def no_colon_file_naming(*args):
file_name = fileio.destination_prefix_naming()(*args)
return file_name.replace(':', '_')
# The pipeline that we are testing
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
> with TestPipeline(options=options) as p:
apache_beam/io/fileio_test.py:707:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7ef72e495960>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7ef7078ec1c0>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7ef72ff75ba0>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.fileio_test.WriteFilesTest
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_streaming_different_file_types (apache_beam.io.fileio_test.WriteFilesTest) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 0s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.fileio_test.WriteFilesTest testMethod=test_streaming_different_file_types>
def test_streaming_different_file_types(self):
dir = self._new_tempdir()
input = iter(WriteFilesTest.SIMPLE_COLLECTION)
ts = (
TestStream().advance_watermark_to(0).add_elements(
[next(input), next(input)]).advance_watermark_to(10).add_elements(
[next(input),
next(input)]).advance_watermark_to(20).add_elements([
next(input), next(input)
]).advance_watermark_to(30).add_elements([
next(input), next(input)
]).advance_watermark_to(40).advance_watermark_to_infinity())
def no_colon_file_naming(*args):
file_name = fileio.destination_prefix_naming()(*args)
return file_name.replace(':', '_')
> with TestPipeline() as p:
apache_beam/io/fileio_test.py:772:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7ef72fa6b850>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7ef705ca2680>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7ef705ca1ff0>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.avroio_test.WriteStreamingTest
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_write_streaming_2_shards_custom_shard_name_template (apache_beam.io.avroio_test.WriteStreamingTest) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 1s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.avroio_test.WriteStreamingTest testMethod=test_write_streaming_2_shards_custom_shard_name_template>
num_shards = 2, shard_name_template = '-V-SSSSS-of-NNNNN'
def test_write_streaming_2_shards_custom_shard_name_template(
self, num_shards=2, shard_name_template='-V-SSSSS-of-NNNNN'):
> with TestPipeline() as p:
apache_beam/io/avroio_test.py:856:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x78a2d46b4880>
pipeline = <apache_beam.pipeline.Pipeline object at 0x78a2d412b370>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x78a2d4316800>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.avroio_test.WriteStreamingTest
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_write_streaming_2_shards_custom_shard_name_template_5s_window (apache_beam.io.avroio_test.WriteStreamingTest) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 0s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.avroio_test.WriteStreamingTest testMethod=test_write_streaming_2_shards_custom_shard_name_template_5s_window>
num_shards = 2, shard_name_template = '-V-SSSSS-of-NNNNN'
triggering_frequency = 5
def test_write_streaming_2_shards_custom_shard_name_template_5s_window(
self,
num_shards=2,
shard_name_template='-V-SSSSS-of-NNNNN',
triggering_frequency=5):
> with TestPipeline() as p:
apache_beam/io/avroio_test.py:904:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x78a2d43ee2f0>
pipeline = <apache_beam.pipeline.Pipeline object at 0x78a2dcc17b80>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x78a2dcc16b30>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.avroio_test.WriteStreamingTest
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_write_streaming_2_shards_default_shard_name_template (apache_beam.io.avroio_test.WriteStreamingTest) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 1s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.avroio_test.WriteStreamingTest testMethod=test_write_streaming_2_shards_default_shard_name_template>
num_shards = 2
def test_write_streaming_2_shards_default_shard_name_template(
self, num_shards=2):
> with TestPipeline() as p:
apache_beam/io/avroio_test.py:805:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x78a2dcc90cd0>
pipeline = <apache_beam.pipeline.Pipeline object at 0x78a304de7a60>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x78a303ccdf60>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_triggering_frequency_0 (apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 4s]
Raw output
TypeError: 'NoneType' object is not iterable
a = (<apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads testMethod=test_triggering_frequency_0>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py310-cloudcoverage/py310-cloudcoverage/lib/python3.10/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_file_loads_test.py:1051: in test_triggering_frequency
with TestPipeline(runner='BundleBasedDirectRunner',
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7ef6b7727d90>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7ef7066a39a0>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7ef6dd33bb50>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_triggering_frequency_1 (apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 3s]
Raw output
TypeError: 'NoneType' object is not iterable
a = (<apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads testMethod=test_triggering_frequency_1>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py310-cloudcoverage/py310-cloudcoverage/lib/python3.10/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_file_loads_test.py:1051: in test_triggering_frequency
with TestPipeline(runner='BundleBasedDirectRunner',
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7ef6de6a59c0>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7ef6f44ccb80>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7ef6f46c2bf0>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_triggering_frequency_2 (apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 3s]
Raw output
TypeError: 'NoneType' object is not iterable
a = (<apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads testMethod=test_triggering_frequency_2>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py310-cloudcoverage/py310-cloudcoverage/lib/python3.10/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_file_loads_test.py:1051: in test_triggering_frequency
with TestPipeline(runner='BundleBasedDirectRunner',
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7ef6de28c730>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7ef706697280>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7ef705c06380>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_triggering_frequency_3 (apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 2s]
Raw output
TypeError: 'NoneType' object is not iterable
a = (<apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads testMethod=test_triggering_frequency_3>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py310-cloudcoverage/py310-cloudcoverage/lib/python3.10/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_file_loads_test.py:1051: in test_triggering_frequency
with TestPipeline(runner='BundleBasedDirectRunner',
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7ef705d35f00>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7ef6dd450130>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7ef6dd452b00>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.pubsub_test.TestReadFromPubSub
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_read_data_success (apache_beam.io.gcp.pubsub_test.TestReadFromPubSub) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 1s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.gcp.pubsub_test.TestReadFromPubSub testMethod=test_read_data_success>
mock_pubsub = <MagicMock name='SubscriberClient' id='140473778759872'>
def test_read_data_success(self, mock_pubsub):
data_encoded = '#x1F937 ¯\\_(ツ)_/¯'.encode('utf-8')
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response(
[test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)])
expected_elements = [data_encoded]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
> with TestPipeline(options=options) as p:
apache_beam/io/gcp/pubsub_test.py:569:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7fc29844c610>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7fc2a6464490>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7fc29842f2b0>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_triggering_frequency_4 (apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 3s]
Raw output
TypeError: 'NoneType' object is not iterable
a = (<apache_beam.io.gcp.bigquery_file_loads_test.TestBigQueryFileLoads testMethod=test_triggering_frequency_4>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py310-cloudcoverage/py310-cloudcoverage/lib/python3.10/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_file_loads_test.py:1051: in test_triggering_frequency
with TestPipeline(runner='BundleBasedDirectRunner',
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7ef72e66fc40>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7ef705d31b10>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7ef6dcd1ccd0>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.pubsub_test.TestReadFromPubSub
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_read_message_id_label_unsupported (apache_beam.io.gcp.pubsub_test.TestReadFromPubSub) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 0s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.gcp.pubsub_test.TestReadFromPubSub testMethod=test_read_message_id_label_unsupported>
unused_mock_pubsub = <MagicMock name='SubscriberClient' id='140472514670752'>
def test_read_message_id_label_unsupported(self, unused_mock_pubsub):
# id_label is unsupported in DirectRunner.
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
with self.assertRaisesRegex(NotImplementedError,
r'id_label is not supported'):
> with TestPipeline(options=options) as p:
apache_beam/io/gcp/pubsub_test.py:720:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.pubsub_test.TestReadFromPubSub
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_read_messages_success (apache_beam.io.gcp.pubsub_test.TestReadFromPubSub) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 0s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.gcp.pubsub_test.TestReadFromPubSub testMethod=test_read_messages_success>
mock_pubsub = <MagicMock name='SubscriberClient' id='140473752588720'>
def test_read_messages_success(self, mock_pubsub):
data = b'data'
publish_time_secs = 1520861821
publish_time_nanos = 234567000
attributes = {'key': 'value'}
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
timestamp.Timestamp(1520861821.234567), [window.GlobalWindow()])
]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
> with TestPipeline(options=options) as p:
apache_beam/io/gcp/pubsub_test.py:523:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7fc29830f280>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7fc2983ba9e0>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7fc2982f8340>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.pubsub_test.TestReadFromPubSub
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_read_messages_timestamp_attribute_fail_parse (apache_beam.io.gcp.pubsub_test.TestReadFromPubSub) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 0s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.gcp.pubsub_test.TestReadFromPubSub testMethod=test_read_messages_timestamp_attribute_fail_parse>
mock_pubsub = <MagicMock name='SubscriberClient' id='140473754720176'>
def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
data = b'data'
attributes = {'time': '1337 unparseable'}
publish_time_secs = 1520861821
publish_time_nanos = 234567000
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
_ = (
p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic',
None,
None,
with_attributes=True,
timestamp_attribute='time'))
with self.assertRaisesRegex(ValueError, r'parse'):
> p.run()
apache_beam/io/gcp/pubsub_test.py:709:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.pubsub_test.TestReadFromPubSub
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_read_messages_timestamp_attribute_milli_success (apache_beam.io.gcp.pubsub_test.TestReadFromPubSub) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 0s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.gcp.pubsub_test.TestReadFromPubSub testMethod=test_read_messages_timestamp_attribute_milli_success>
mock_pubsub = <MagicMock name='SubscriberClient' id='140472514288544'>
def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
data = b'data'
attributes = {'time': '1337'}
publish_time_secs = 1520861821
publish_time_nanos = 234567000
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
timestamp.Timestamp(micros=int(attributes['time']) * 1000),
[window.GlobalWindow()]),
]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
> with TestPipeline(options=options) as p:
apache_beam/io/gcp/pubsub_test.py:599:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7fc29876b100>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7fc2999cf5e0>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7fc29863bca0>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.pubsub_test.TestReadFromPubSub
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_read_messages_timestamp_attribute_missing (apache_beam.io.gcp.pubsub_test.TestReadFromPubSub) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 0s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.gcp.pubsub_test.TestReadFromPubSub testMethod=test_read_messages_timestamp_attribute_missing>
mock_pubsub = <MagicMock name='SubscriberClient' id='140473803361728'>
def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
data = b'data'
attributes = {}
publish_time_secs = 1520861821
publish_time_nanos = 234567000
publish_time = '2018-03-12T13:37:01.234567Z'
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
timestamp.Timestamp.from_rfc3339(publish_time),
[window.GlobalWindow()]),
]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
> with TestPipeline(options=options) as p:
apache_beam/io/gcp/pubsub_test.py:670:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7fc298182c20>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7fc29a427d30>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7fc2c355b9a0>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.pubsub_test.TestReadFromPubSub
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_read_messages_timestamp_attribute_rfc3339_success (apache_beam.io.gcp.pubsub_test.TestReadFromPubSub) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 0s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.gcp.pubsub_test.TestReadFromPubSub testMethod=test_read_messages_timestamp_attribute_rfc3339_success>
mock_pubsub = <MagicMock name='SubscriberClient' id='140473754961808'>
def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
data = b'data'
attributes = {'time': '2018-03-12T13:37:01.234567Z'}
publish_time_secs = 1337000000
publish_time_nanos = 133700000
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response([
test_utils.PullResponseMessage(
data, attributes, publish_time_secs, publish_time_nanos, ack_id)
])
expected_elements = [
TestWindowedValue(
PubsubMessage(data, attributes),
timestamp.Timestamp.from_rfc3339(attributes['time']),
[window.GlobalWindow()]),
]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
> with TestPipeline(options=options) as p:
apache_beam/io/gcp/pubsub_test.py:634:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7fc2986f4c10>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7fc2c09bdcf0>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7fc2c0aeba90>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.pubsub_test.TestReadFromPubSub
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_read_strings_success (apache_beam.io.gcp.pubsub_test.TestReadFromPubSub) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 0s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.gcp.pubsub_test.TestReadFromPubSub testMethod=test_read_strings_success>
mock_pubsub = <MagicMock name='SubscriberClient' id='140473777014912'>
def test_read_strings_success(self, mock_pubsub):
data = '#x1F937 ¯\\_(ツ)_/¯'
data_encoded = data.encode('utf-8')
ack_id = 'ack_id'
pull_response = test_utils.create_pull_response(
[test_utils.PullResponseMessage(data_encoded, ack_id=ack_id)])
expected_elements = [data]
mock_pubsub.return_value.pull.return_value = pull_response
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
> with TestPipeline(options=options) as p:
apache_beam/io/gcp/pubsub_test.py:548:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7fc290370df0>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7fc2983e6da0>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7fc290762380>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.examples.snippets.snippets_test.SnippetsTest
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_examples_wordcount_streaming (apache_beam.examples.snippets.snippets_test.SnippetsTest) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 1s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.examples.snippets.snippets_test.SnippetsTest testMethod=test_examples_wordcount_streaming>
unused_mocks = (<MagicMock name='WriteToPubSub' id='137455069560256'>, <MagicMock name='ReadFromPubSub' id='137455069552288'>)
FakeReadFromPubSub = <function SnippetsTest.test_examples_wordcount_streaming.<locals>.FakeReadFromPubSub at 0x7d03db337ac0>
FakeWriteToPubSub = <function SnippetsTest.test_examples_wordcount_streaming.<locals>.FakeWriteToPubSub at 0x7d03db3349d0>
input_topic = 'projects/fake-beam-test-project/topic/intopic'
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@mock.patch('apache_beam.io.ReadFromPubSub')
@mock.patch('apache_beam.io.WriteToPubSub')
def test_examples_wordcount_streaming(self, *unused_mocks):
def FakeReadFromPubSub(topic=None, subscription=None, values=None):
expected_topic = topic
expected_subscription = subscription
def _inner(topic=None, subscription=None):
assert topic == expected_topic
assert subscription == expected_subscription
return TestStream().add_elements(values)
return _inner
class AssertTransform(beam.PTransform):
def __init__(self, matcher):
self.matcher = matcher
def expand(self, pcoll):
assert_that(pcoll, self.matcher)
def FakeWriteToPubSub(topic=None, values=None):
expected_topic = topic
def _inner(topic=None, subscription=None):
assert topic == expected_topic
return AssertTransform(equal_to(values))
return _inner
# Test basic execution.
input_topic = 'projects/fake-beam-test-project/topic/intopic'
input_values = [
TimestampedValue(b'a a b', 1),
TimestampedValue('#x1F937 ¯\\_(ツ)_/¯ b b '.encode('utf-8'), 12),
TimestampedValue(b'a b c c c', 20)
]
output_topic = 'projects/fake-beam-test-project/topic/outtopic'
output_values = [b'a: 1', b'a: 2', b'b: 1', b'b: 3', b'c: 3']
beam.io.ReadFromPubSub = (
FakeReadFromPubSub(topic=input_topic, values=input_values))
beam.io.WriteToPubSub = (
FakeWriteToPubSub(topic=output_topic, values=output_values))
test_argv = [
'unused_argv[0]',
'--input_topic',
'projects/fake-beam-test-project/topic/intopic',
'--output_topic',
'projects/fake-beam-test-project/topic/outtopic'
]
with mock.patch.object(sys, 'argv', test_argv):
> snippets.examples_wordcount_streaming()
apache_beam/examples/snippets/snippets_test.py:893:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/examples/snippets/snippets.py:487: in examples_wordcount_streaming
with TestPipeline(options=beam_options) as pipeline:
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7d03c0f07be0>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7d0403227490>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7d03c0f29810>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.examples.snippets.snippets_test.SnippetsTest
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_model_composite_triggers (apache_beam.examples.snippets.snippets_test.SnippetsTest) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 1s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.examples.snippets.snippets_test.SnippetsTest testMethod=test_model_composite_triggers>
def test_model_composite_triggers(self):
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
> with TestPipeline(options=pipeline_options) as p:
apache_beam/examples/snippets/snippets_test.py:1138:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7d03c0f537c0>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7d0401ee2290>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7d03c076f2e0>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.examples.snippets.snippets_test.SnippetsTest
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_model_early_late_triggers (apache_beam.examples.snippets.snippets_test.SnippetsTest) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 1s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.examples.snippets.snippets_test.SnippetsTest testMethod=test_model_early_late_triggers>
def test_model_early_late_triggers(self):
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
> with TestPipeline(options=pipeline_options) as p:
apache_beam/examples/snippets/snippets_test.py:1082:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7d03c0b55c30>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7d03c0f80e50>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7d03c0f83d00>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestWriteToBigQuery
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_streaming_triggering_frequency_with_auto_sharding (apache_beam.io.gcp.bigquery_test.TestWriteToBigQuery) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 0s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.io.gcp.bigquery_test.TestWriteToBigQuery testMethod=test_streaming_triggering_frequency_with_auto_sharding>
def test_streaming_triggering_frequency_with_auto_sharding(self):
def noop(table, **kwargs):
return []
client = mock.Mock()
client.insert_rows_json = mock.Mock(side_effect=noop)
opt = StandardOptions()
opt.streaming = True
> with beam.Pipeline(runner='BundleBasedDirectRunner', options=opt) as p:
apache_beam/io/gcp/bigquery_test.py:1062:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7ef713e599f0>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7ef705aca680>
options = <apache_beam.options.pipeline_options.StandardOptions object at 0x7ef6f441afb0>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_insert_rows_json_errors_retry_always_0 (apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 1s]
Raw output
TypeError: 'NoneType' object is not iterable
a = (<apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling testMethod=test_insert_rows_json_errors_retry_always_0>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py310-cloudcoverage/py310-cloudcoverage/lib/python3.10/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_test.py:1668: in test_insert_rows_json_errors_retry_always
with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7ef6de0f5db0>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7ef6dd30e470>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7ef6de250a00>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_insert_rows_json_errors_retry_always_1 (apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 1s]
Raw output
TypeError: 'NoneType' object is not iterable
a = (<apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling testMethod=test_insert_rows_json_errors_retry_always_1>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py310-cloudcoverage/py310-cloudcoverage/lib/python3.10/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_test.py:1668: in test_insert_rows_json_errors_retry_always
with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7ef705a3c4f0>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7ef6dc18e3e0>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7ef6f448a740>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError
Check warning on line 0 in apache_beam.examples.snippets.snippets_test.SnippetsTest
github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, highmem)
test_model_other_composite_triggers (apache_beam.examples.snippets.snippets_test.SnippetsTest) failed
sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-cloudcoverage.xml [took 1s]
Raw output
TypeError: 'NoneType' object is not iterable
self = <apache_beam.examples.snippets.snippets_test.SnippetsTest testMethod=test_model_other_composite_triggers>
def test_model_other_composite_triggers(self):
pipeline_options = PipelineOptions(
flags=['--streaming', '--allow_unsafe_triggers'])
> with TestPipeline(options=pipeline_options) as p:
apache_beam/examples/snippets/snippets_test.py:1169:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:646: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:118: in run
result = super().run(
apache_beam/pipeline.py:599: in run
self._options).run(False)
apache_beam/pipeline.py:623: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:245: in run_pipeline
return runner.run_pipeline(pipeline, options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.direct.direct_runner.BundleBasedDirectRunner object at 0x7d03c0a76560>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7d03c0e7b8b0>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7d03c069a1d0>
def run_pipeline(self, pipeline, options):
"""Execute the entire pipeline and returns an DirectPipelineResult."""
# TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
# with resolving imports when they are at top.
# pylint: disable=wrong-import-position
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.direct.consumer_tracking_pipeline_visitor import ConsumerTrackingPipelineVisitor
from apache_beam.runners.direct.evaluation_context import EvaluationContext
from apache_beam.runners.direct.executor import Executor
from apache_beam.runners.direct.transform_evaluator import TransformEvaluatorRegistry
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.external import ExternalTransform
class VerifyNoCrossLanguageTransforms(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, ExternalTransform):
raise RuntimeError(
"Streaming Python direct runner "
"does not support cross-language pipelines."
"Please use other runners such as FlinkRunner, "
"DataflowRunner, or PrismRunner.")
pipeline.visit(VerifyNoCrossLanguageTransforms())
# If the TestStream I/O is used, use a mock test clock.
class TestStreamUsageVisitor(PipelineVisitor):
"""Visitor determining whether a Pipeline uses a TestStream."""
def __init__(self):
self.uses_test_stream = False
def visit_transform(self, applied_ptransform):
if isinstance(applied_ptransform.transform, TestStream):
self.uses_test_stream = True
visitor = TestStreamUsageVisitor()
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()
# Performing configured PTransform overrides.
pipeline.replace_all(_get_transform_overrides(options))
_LOGGER.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
pipeline.visit(self.consumer_tracking_visitor)
evaluation_context = EvaluationContext(
options,
BundleFactory(
stacked=options.view_as(
DirectOptions).direct_runner_use_stacked_bundle),
self.consumer_tracking_visitor.root_transforms,
self.consumer_tracking_visitor.value_to_consumers,
self.consumer_tracking_visitor.step_names,
self.consumer_tracking_visitor.views,
clock)
executor = Executor(
self.consumer_tracking_visitor.value_to_consumers,
TransformEvaluatorRegistry(evaluation_context),
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options(
> {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
E TypeError: 'NoneType' object is not iterable
apache_beam/runners/direct/direct_runner.py:617: TypeError