diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index a9c1b91d9d54..562941918a81 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -102,9 +102,11 @@ cdef class PerWindowInvoker(DoFnInvoker): cdef dict kwargs_for_process_batch cdef list placeholders_for_process_batch cdef bint has_windowed_inputs - cdef bint recalculate_window_args - cdef bint has_cached_window_args - cdef bint has_cached_window_batch_args + cdef bint should_cache_args + cdef list cached_args_for_process + cdef dict cached_kwargs_for_process + cdef list cached_args_for_process_batch + cdef dict cached_kwargs_for_process_batch cdef object process_method cdef object process_batch_method cdef bint is_splittable diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 034090cf7bdc..effe984e73f2 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -812,16 +812,15 @@ def __init__( self.current_window_index = None self.stop_window_index = None - # TODO(https://github.com/apache/beam/issues/28776): Remove caching after - # fully rolling out. - # If true, always recalculate window args. If false, has_cached_window_args - # and has_cached_window_batch_args will be set to true if the corresponding - # self.args_for_process,have been updated and should be reused directly. - self.recalculate_window_args = ( - self.has_windowed_inputs or 'disable_global_windowed_args_caching' - in RuntimeValueProvider.experiments) - self.has_cached_window_args = False - self.has_cached_window_batch_args = False + # If true, after the first process invocation the the args for process will be cached + # in cached_args_for_process and cached_kwargs_for_process and reused on + # subsequent invocations in the same bundle.. + self.should_cache_args = (not self.has_windowed_inputs) + self.cached_args_for_process = None + self.cached_kwargs_for_process = None + # See above, similar cached args for process_batch invocations. + self.cached_args_for_process_batch = None + self.cached_kwargs_for_process_batch = None # Try to prepare all the arguments that can just be filled in # without any additional work. in the process function. @@ -984,9 +983,9 @@ def _invoke_process_per_window( additional_kwargs, ): # type: (...) -> Optional[SplitResultResidual] - if self.has_cached_window_args: + if self.cached_args_for_process: args_for_process, kwargs_for_process = ( - self.args_for_process, self.kwargs_for_process) + self.cached_args_for_process, self.cached_kwargs_for_process) else: if self.has_windowed_inputs: assert len(windowed_value.windows) <= 1 @@ -997,10 +996,9 @@ def _invoke_process_per_window( side_inputs.extend(additional_args) args_for_process, kwargs_for_process = util.insert_values_in_args( self.args_for_process, self.kwargs_for_process, side_inputs) - if not self.recalculate_window_args: - self.args_for_process, self.kwargs_for_process = ( + if self.should_cache_args: + self.cached_args_for_process, self.cached_kwargs_for_process = ( args_for_process, kwargs_for_process) - self.has_cached_window_args = True # Extract key in the case of a stateful DoFn. Note that in the case of a # stateful DoFn, we set during __init__ self.has_windowed_inputs to be @@ -1088,9 +1086,9 @@ def _invoke_process_batch_per_window( ): # type: (...) -> Optional[SplitResultResidual] - if self.has_cached_window_batch_args: + if self.cached_args_for_process_batch: args_for_process_batch, kwargs_for_process_batch = ( - self.args_for_process_batch, self.kwargs_for_process_batch) + self.cached_args_for_process_batch, self.cached_kwargs_for_process_batch) else: if self.has_windowed_inputs: assert isinstance(windowed_batch, HomogeneousWindowedBatch) @@ -1107,10 +1105,9 @@ def _invoke_process_batch_per_window( side_inputs, ) ) - if not self.recalculate_window_args: - self.args_for_process_batch, self.kwargs_for_process_batch = ( + if self.should_cache_args: + self.cached_args_for_process_batch, self.cached_kwargs_for_process_batch = ( args_for_process_batch, kwargs_for_process_batch) - self.has_cached_window_batch_args = True for i, p in self.placeholders_for_process_batch: if core.DoFn.ElementParam == p: @@ -1150,6 +1147,15 @@ def _invoke_process_batch_per_window( *args_for_process_batch, **kwargs_for_process_batch), self.threadsafe_watermark_estimator) + def invoke_finish_bundle(self): + # type: () -> None + # Clear the cached args to allow for refreshing of side inputs across bundles. + self.cached_args_for_process, self.cached_kwargs_for_process = (None, None) + self.cached_args_for_process_batch, self.cached_kwargs_for_process_batch = ( + None, None) + + super(PerWindowInvoker, self).invoke_finish_bundle() + @staticmethod def _try_split( fraction, diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 73b0321b5de4..ff145c51e96f 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -613,7 +613,9 @@ def visit_transform(self, applied_ptransform): evaluation_context) # DirectRunner does not support injecting # PipelineOptions values at runtime - RuntimeValueProvider.set_runtime_options({}) + RuntimeValueProvider.set_runtime_options( + {'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)} + ) # Start the executor. This is a non-blocking call, it will start the # execution in background threads and return. executor.start(self.consumer_tracking_visitor.root_transforms) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index fdf291cb6f12..078e1335d317 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -151,6 +151,7 @@ def run_pipeline( if not 'beam_fn_api' in experiments: experiments.append('beam_fn_api') options.view_as(pipeline_options.DebugOptions).experiments = experiments + RuntimeValueProvider.set_runtime_options({'experiments': set(experiments)}) # This is sometimes needed if type checking is disabled # to enforce that the inputs (and outputs) of GroupByKey operations diff --git a/sdks/python/apache_beam/testing/load_tests/microbenchmarks_test.py b/sdks/python/apache_beam/testing/load_tests/microbenchmarks_test.py index 34d4080c072f..16e0e9d56cd9 100644 --- a/sdks/python/apache_beam/testing/load_tests/microbenchmarks_test.py +++ b/sdks/python/apache_beam/testing/load_tests/microbenchmarks_test.py @@ -54,6 +54,7 @@ from apache_beam.testing.load_tests.load_test import LoadTest from apache_beam.tools import fn_api_runner_microbenchmark +from apache_beam.tools import map_fn_microbenchmark from apache_beam.tools import teststream_microbenchmark from apache_beam.transforms.util import _BatchSizeEstimator @@ -92,6 +93,19 @@ def _run_fn_api_runner_microbenchmark(self): 'fn_api_runner_microbenchmark_per_element_cost_ms': b * 1000, } + def _run_map_fn_microbenchmark(self): + start = time.perf_counter() + result = map_fn_microbenchmark.run_benchmark(verbose=False) + sizes = list(result[0].values())[0] + costs = list(result[1].values())[0] + a, b = _BatchSizeEstimator.linear_regression_no_numpy(sizes, costs) + + return { + 'map_fn_microbenchmark_runtime_sec': time.perf_counter() - start, + 'map_fn_microbenchmark_fixed_cost_ms': a * 1000, + 'map_fn_microbenchmark_per_element_cost_ms': b * 1000, + } + if __name__ == '__main__': logging.basicConfig(level=logging.INFO) diff --git a/sdks/python/apache_beam/tools/map_fn_microbenchmark.py b/sdks/python/apache_beam/tools/map_fn_microbenchmark.py index cdbc5c4e6cb4..19ffda35d841 100644 --- a/sdks/python/apache_beam/tools/map_fn_microbenchmark.py +++ b/sdks/python/apache_beam/tools/map_fn_microbenchmark.py @@ -23,7 +23,7 @@ This executes the same codepaths that are run on the Fn API (and Dataflow) workers, but is generally easier to run (locally) and more stable. It does -not, on the other hand, excercise any non-trivial amount of IO (e.g. shuffle). +not, on the other hand, exercise any non-trivial amount of IO (e.g. shuffle). Run as @@ -32,41 +32,103 @@ # pytype: skip-file +import argparse import logging -import time - -from scipy import stats import apache_beam as beam +import apache_beam.options.pipeline_options from apache_beam.tools import utils +from apache_beam.transforms.window import FixedWindows -def run_benchmark(num_maps=100, num_runs=10, num_elements_step=1000): - timings = {} - for run in range(num_runs): - num_elements = num_elements_step * run + 1 - start = time.time() +def map_pipeline(num_elements, num_maps=100): + def _pipeline_runner(): with beam.Pipeline() as p: pc = p | beam.Create(list(range(num_elements))) for ix in range(num_maps): - pc = pc | 'Map%d' % ix >> beam.FlatMap(lambda x: (None, )) - timings[num_elements] = time.time() - start - print( - "%6d element%s %g sec" % ( - num_elements, - " " if num_elements == 1 else "s", - timings[num_elements])) - - print() - # pylint: disable=unused-variable - gradient, intercept, r_value, p_value, std_err = stats.linregress( - *list(zip(*list(timings.items())))) - print("Fixed cost ", intercept) - print("Per-element ", gradient / num_maps) - print("R^2 ", r_value**2) + pc = pc | 'Map%d' % ix >> beam.FlatMap(lambda x: (None,)) + + return _pipeline_runner + + +def map_with_global_side_input_pipeline(num_elements, num_maps=100): + def add(element, side_input): + return element + side_input + + def _pipeline_runner(): + with beam.Pipeline() as p: + side = p | 'CreateSide' >> beam.Create([1]) + pc = p | 'CreateMain' >> beam.Create(list(range(num_elements))) + for ix in range(num_maps): + pc = pc | 'Map%d' % ix >> beam.Map(add, beam.pvalue.AsSingleton(side)) + + return _pipeline_runner + +def map_with_global_side_input_pipeline_uncached(num_elements, num_maps=100): + def add(element, side_input): + return element + side_input + + def _pipeline_runner(): + beam_options = beam.options.pipeline_options.DebugOptions() + beam_options.add_experiment('disable_global_windowed_args_caching') + with beam.Pipeline(options=beam_options) as p: + side = p | 'CreateSide' >> beam.Create([1]) + pc = p | 'CreateMain' >> beam.Create(list(range(num_elements))) + for ix in range(num_maps): + pc = pc | 'Map%d' % ix >> beam.Map(add, beam.pvalue.AsSingleton(side)) + + return _pipeline_runner + +def map_with_fixed_window_side_input_pipeline(num_elements, num_maps=100): + def add(element, side_input): + return element + side_input + + def _pipeline_runner(): + with beam.Pipeline() as p: + side = p | 'CreateSide' >> beam.Create([1]) | 'WindowSide' >> beam.WindowInto(FixedWindows(1000)) + pc = p | 'CreateMain' >> beam.Create(list(range(num_elements))) | 'WindowMain' >> beam.WindowInto(FixedWindows(1000)) + for ix in range(num_maps): + pc = pc | 'Map%d' % ix >> beam.Map(add, beam.pvalue.AsSingleton(side)) + + return _pipeline_runner + +def run_benchmark( + starting_point=1, num_runs=10, num_elements_step=100, verbose=True, profile_filename_base=None, +): + suite = [ + utils.LinearRegressionBenchmarkConfig( + map_pipeline, starting_point, num_elements_step, num_runs + ), + utils.BenchmarkConfig( + map_with_global_side_input_pipeline, + starting_point * 1000, + num_runs, + ), + utils.BenchmarkConfig( + map_with_fixed_window_side_input_pipeline, + starting_point * 1000, + num_runs, + ), + ] + return utils.run_benchmarks(suite, verbose=verbose, profile_filename_base=profile_filename_base) if __name__ == '__main__': logging.basicConfig() utils.check_compiled('apache_beam.runners.common') - run_benchmark() + + parser = argparse.ArgumentParser() + parser.add_argument('--num_runs', default=10, type=int) + parser.add_argument('--starting_point', default=1, type=int) + parser.add_argument('--increment', default=100, type=int) + parser.add_argument('--verbose', default=True, type=bool) + parser.add_argument('--profile_filename_base', default=None, type=str) + options = parser.parse_args() + + run_benchmark( + options.starting_point, + options.num_runs, + options.increment, + options.verbose, + options.profile_filename_base, + ) diff --git a/sdks/python/apache_beam/tools/utils.py b/sdks/python/apache_beam/tools/utils.py index e3df3f2c1c6f..1124ac156f86 100644 --- a/sdks/python/apache_beam/tools/utils.py +++ b/sdks/python/apache_beam/tools/utils.py @@ -20,6 +20,7 @@ # pytype: skip-file import collections +import cProfile import gc import importlib import os @@ -70,10 +71,11 @@ def __call__(self): size: int num_runs: int + def name(self): + return getattr(self.benchmark, '__name__', str(self.benchmark)) + def __str__(self): - return "%s, %s element(s)" % ( - getattr(self.benchmark, '__name__', str(self.benchmark)), - str(self.size)) + return "%s, %s element(s)" % (self.name(), str(self.size)) class LinearRegressionBenchmarkConfig(NamedTuple): @@ -109,7 +111,8 @@ def __str__(self): str(self.increment)) -def run_benchmarks(benchmark_suite, verbose=True): +def run_benchmarks(benchmark_suite, verbose=True, + profile_filename_base=None): """Runs benchmarks, and collects execution times. A simple instrumentation to run a callable several times, collect and print @@ -123,12 +126,15 @@ def run_benchmarks(benchmark_suite, verbose=True): A dictionary of the form string -> list of floats. Keys of the dictionary are benchmark names, values are execution times in seconds for each run. """ + profiler = cProfile.Profile() def run(benchmark: BenchmarkFactoryFn, size: int): # Contain each run of a benchmark inside a function so that any temporary # objects can be garbage-collected after the run. benchmark_instance_callable = benchmark(size) start = time.time() + profiler.enable() _ = benchmark_instance_callable() + profiler.disable() return time.time() - start cost_series = collections.defaultdict(list) @@ -164,6 +170,11 @@ def run(benchmark: BenchmarkFactoryFn, size: int): if verbose: print("") + if profile_filename_base: + filename = profile_filename_base + benchmark_config.name() + '.prof' + print("Dumping profile to " + filename) + profiler.dump_stats(filename) + if verbose: pad_length = max([len(str(bc)) for bc in benchmark_suite])