Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions sdks/python/apache_beam/transforms/ptransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,15 @@ def __init__(self, fn, *args, **kwargs):
# Ensure fn and side inputs are picklable for remote execution.
try:
self.fn = pickler.roundtrip(self.fn)
except RuntimeError as e:
raise RuntimeError('Unable to pickle fn %s: %s' % (self.fn, e))
except (RuntimeError, TypeError, Exception) as e:
raise RuntimeError(
'Unable to pickle fn %s: %s. '
'User code must be serializable (picklable) for distributed '
'execution. This usually happens when lambdas or closures capture '
'non-serializable objects like file handles, database connections, '
'or thread locks. Try: (1) using module-level functions instead of '
'lambdas, (2) initializing resources in setup() methods, '
'(3) checking what your closure captures.' % (self.fn, e)) from e

self.args = pickler.roundtrip(self.args)
self.kwargs = pickler.roundtrip(self.kwargs)
Expand Down
19 changes: 19 additions & 0 deletions sdks/python/apache_beam/transforms/ptransform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,25 @@ def test_do_with_side_input_as_keyword_arg(self):
lambda x, addon: [x + addon], addon=pvalue.AsSingleton(side))
assert_that(result, equal_to([11, 12, 13]))

def test_callable_non_serializable_error_message(self):
class NonSerializable:
def __getstate__(self):
raise RuntimeError('nope')

bad = NonSerializable()

with self.assertRaises(RuntimeError) as context:
_ = beam.Map(lambda x: bad)

message = str(context.exception)
self.assertIn('Unable to pickle fn', message)
self.assertIn(
'User code must be serializable (picklable) for distributed execution.',
message)
self.assertIn('non-serializable objects like file handles', message)
self.assertIn(
'Try: (1) using module-level functions instead of lambdas', message)

def test_do_with_do_fn_returning_string_raises_warning(self):
ex_details = r'.*Returning a str from a ParDo or FlatMap is discouraged.'

Expand Down
Loading