diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 9c5306e143ec..94e9a0644d04 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -883,8 +883,15 @@ def __init__(self, fn, *args, **kwargs): # Ensure fn and side inputs are picklable for remote execution. try: self.fn = pickler.roundtrip(self.fn) - except RuntimeError as e: - raise RuntimeError('Unable to pickle fn %s: %s' % (self.fn, e)) + except (RuntimeError, TypeError, Exception) as e: + raise RuntimeError( + 'Unable to pickle fn %s: %s. ' + 'User code must be serializable (picklable) for distributed ' + 'execution. This usually happens when lambdas or closures capture ' + 'non-serializable objects like file handles, database connections, ' + 'or thread locks. Try: (1) using module-level functions instead of ' + 'lambdas, (2) initializing resources in setup() methods, ' + '(3) checking what your closure captures.' % (self.fn, e)) from e self.args = pickler.roundtrip(self.args) self.kwargs = pickler.roundtrip(self.kwargs) diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 9a9bf6ff0a74..8c2acefccdb3 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -163,6 +163,25 @@ def test_do_with_side_input_as_keyword_arg(self): lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side)) assert_that(result, equal_to([11, 12, 13])) + def test_callable_non_serializable_error_message(self): + class NonSerializable: + def __getstate__(self): + raise RuntimeError('nope') + + bad = NonSerializable() + + with self.assertRaises(RuntimeError) as context: + _ = beam.Map(lambda x: bad) + + message = str(context.exception) + self.assertIn('Unable to pickle fn', message) + self.assertIn( + 'User code must be serializable (picklable) for distributed execution.', + message) + self.assertIn('non-serializable objects like file handles', message) + self.assertIn( + 'Try: (1) using module-level functions instead of lambdas', message) + def test_do_with_do_fn_returning_string_raises_warning(self): ex_details = r'.*Returning a str from a ParDo or FlatMap is discouraged.'