Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 1 addition & 26 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,32 +65,7 @@

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

## Deprecations

* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes

* Fixed [CVE-YYYY-NNNN](https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN) (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).

## Known Issues

[comment]: # ( When updating known issues after release, make sure also update website blog in website/www/site/content/blog.)
* ([#X](https://github.com/apache/beam/issues/X)).
* Added DATE, DATETIME, and JSON type support to BigQuery I/O for use with Storage Write API and BEAM_ROW format (Python) ([#25946](https://github.com/apache/beam/issues/25946)).

# [2.71.0] - Unreleased

Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
"BYTES": bytes,
"TIMESTAMP": apache_beam.utils.timestamp.Timestamp,
"GEOGRAPHY": str,
"DATE": str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember DATE/DATETIME types were already supported in Python BigQuery Storage API. cc: @ahmedabu98

If there is ineed still gap, we should use Python language type rather than str. Similar treatment was done in Python JdbcIO (Jdbc Date type and time type)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Abacn! From our experience, these are definitely missing. I am not sure if it has to do something also missing in under-the-hood xlang implementation that uses the Java SDK, but Beam would block us before even getting it deployed to Dataflow with these types being unsupported.

I'd love to be wrong and have a simpler solution instead of this, though 🙏

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DATE and DATETIME aren't supported yet (only TIMESTAMP)

But I'm not very keen on defaulting them to Strings, it's not very robust. We should choose a Python native type that is closer to what a DATE/DATETIME actually is.

Some options are discussed in https://s.apache.org/beam-timestamp-strategy (under "Python Nanosecond Support" --> "DateTime"). It'll take more work but I think it's a better longterm solution for Beam.

"DATETIME": str,
"JSON": str,
#TODO(https://github.com/apache/beam/issues/20810):
# Finish mappings for all BQ types
}
Expand Down
293 changes: 293 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_schema_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,299 @@ def test_geography_with_complex_wkt(self):

self.assertEqual(usertype.__annotations__, expected_annotations)

def test_date_type_support(self):
"""Test that DATE type is properly supported in schema conversion."""
fields = [
bigquery.TableFieldSchema(
name='birth_date', type='DATE', mode="NULLABLE"),
bigquery.TableFieldSchema(name='dates', type='DATE', mode="REPEATED"),
bigquery.TableFieldSchema(
name='required_date', type='DATE', mode="REQUIRED")
]
schema = bigquery.TableSchema(fields=fields)

usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)

expected_annotations = {
'birth_date': typing.Optional[str],
'dates': typing.Sequence[str],
'required_date': str
}

self.assertEqual(usertype.__annotations__, expected_annotations)

def test_date_in_bq_to_python_types_mapping(self):
"""Test that DATE is included in BIG_QUERY_TO_PYTHON_TYPES mapping."""
from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES

self.assertIn("DATE", BIG_QUERY_TO_PYTHON_TYPES)
self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["DATE"], str)

def test_date_field_type_conversion(self):
"""Test bq_field_to_type function with DATE fields."""
from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type

# Test required DATE field
result = bq_field_to_type("DATE", "REQUIRED")
self.assertEqual(result, str)

# Test nullable DATE field
result = bq_field_to_type("DATE", "NULLABLE")
self.assertEqual(result, typing.Optional[str])

# Test repeated DATE field
result = bq_field_to_type("DATE", "REPEATED")
self.assertEqual(result, typing.Sequence[str])

# Test DATE field with None mode (should default to nullable)
result = bq_field_to_type("DATE", None)
self.assertEqual(result, typing.Optional[str])

# Test DATE field with empty mode (should default to nullable)
result = bq_field_to_type("DATE", "")
self.assertEqual(result, typing.Optional[str])

def test_convert_to_usertype_with_date(self):
"""Test convert_to_usertype function with DATE fields."""
schema = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
name='id', type='INTEGER', mode="REQUIRED"),
bigquery.TableFieldSchema(
name='birth_date', type='DATE', mode="NULLABLE"),
bigquery.TableFieldSchema(
name='name', type='STRING', mode="REQUIRED")
])

conversion_transform = bigquery_schema_tools.convert_to_usertype(schema)

# Verify the transform is created successfully
self.assertIsNotNone(conversion_transform)

# The transform should be a ParDo with BeamSchemaConversionDoFn
self.assertIsInstance(conversion_transform, beam.ParDo)

def test_beam_schema_conversion_dofn_with_date(self):
"""Test BeamSchemaConversionDoFn with DATE data."""
from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn

# Create a user type with DATE field
fields = [
bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"),
bigquery.TableFieldSchema(
name='birth_date', type='DATE', mode="NULLABLE")
]
schema = bigquery.TableSchema(fields=fields)
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)

# Create the DoFn
dofn = BeamSchemaConversionDoFn(usertype)

# Test processing a dictionary with DATE data
input_dict = {'id': 1, 'birth_date': '2021-01-15'}

results = list(dofn.process(input_dict))
self.assertEqual(len(results), 1)

result = results[0]
self.assertEqual(result.id, 1)
self.assertEqual(result.birth_date, '2021-01-15')

def test_datetime_type_support(self):
"""Test that DATETIME type is properly supported in schema conversion."""
fields = [
bigquery.TableFieldSchema(
name='event_time', type='DATETIME', mode="NULLABLE"),
bigquery.TableFieldSchema(
name='timestamps', type='DATETIME', mode="REPEATED"),
bigquery.TableFieldSchema(
name='required_time', type='DATETIME', mode="REQUIRED")
]
schema = bigquery.TableSchema(fields=fields)

usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)

expected_annotations = {
'event_time': typing.Optional[str],
'timestamps': typing.Sequence[str],
'required_time': str
}

self.assertEqual(usertype.__annotations__, expected_annotations)

def test_datetime_in_bq_to_python_types_mapping(self):
"""Test that DATETIME is included in BIG_QUERY_TO_PYTHON_TYPES mapping."""
from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES

self.assertIn("DATETIME", BIG_QUERY_TO_PYTHON_TYPES)
self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["DATETIME"], str)

def test_datetime_field_type_conversion(self):
"""Test bq_field_to_type function with DATETIME fields."""
from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type

# Test required DATETIME field
result = bq_field_to_type("DATETIME", "REQUIRED")
self.assertEqual(result, str)

# Test nullable DATETIME field
result = bq_field_to_type("DATETIME", "NULLABLE")
self.assertEqual(result, typing.Optional[str])

# Test repeated DATETIME field
result = bq_field_to_type("DATETIME", "REPEATED")
self.assertEqual(result, typing.Sequence[str])

# Test DATETIME field with None mode (should default to nullable)
result = bq_field_to_type("DATETIME", None)
self.assertEqual(result, typing.Optional[str])

# Test DATETIME field with empty mode (should default to nullable)
result = bq_field_to_type("DATETIME", "")
self.assertEqual(result, typing.Optional[str])

def test_convert_to_usertype_with_datetime(self):
"""Test convert_to_usertype function with DATETIME fields."""
schema = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
name='id', type='INTEGER', mode="REQUIRED"),
bigquery.TableFieldSchema(
name='event_time', type='DATETIME', mode="NULLABLE"),
bigquery.TableFieldSchema(
name='name', type='STRING', mode="REQUIRED")
])

conversion_transform = bigquery_schema_tools.convert_to_usertype(schema)

# Verify the transform is created successfully
self.assertIsNotNone(conversion_transform)

# The transform should be a ParDo with BeamSchemaConversionDoFn
self.assertIsInstance(conversion_transform, beam.ParDo)

def test_beam_schema_conversion_dofn_with_datetime(self):
"""Test BeamSchemaConversionDoFn with DATETIME data."""
from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn

# Create a user type with DATETIME field
fields = [
bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"),
bigquery.TableFieldSchema(
name='event_time', type='DATETIME', mode="NULLABLE")
]
schema = bigquery.TableSchema(fields=fields)
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)

# Create the DoFn
dofn = BeamSchemaConversionDoFn(usertype)

# Test processing a dictionary with DATETIME data
input_dict = {'id': 1, 'event_time': '2021-01-15T10:30:00'}

results = list(dofn.process(input_dict))
self.assertEqual(len(results), 1)

result = results[0]
self.assertEqual(result.id, 1)
self.assertEqual(result.event_time, '2021-01-15T10:30:00')

def test_json_type_support(self):
"""Test that JSON type is properly supported in schema conversion."""
fields = [
bigquery.TableFieldSchema(name='data', type='JSON', mode="NULLABLE"),
bigquery.TableFieldSchema(name='items', type='JSON', mode="REPEATED"),
bigquery.TableFieldSchema(
name='required_data', type='JSON', mode="REQUIRED")
]
schema = bigquery.TableSchema(fields=fields)

usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)

expected_annotations = {
'data': typing.Optional[str],
'items': typing.Sequence[str],
'required_data': str
}

self.assertEqual(usertype.__annotations__, expected_annotations)

def test_json_in_bq_to_python_types_mapping(self):
"""Test that JSON is included in BIG_QUERY_TO_PYTHON_TYPES mapping."""
from apache_beam.io.gcp.bigquery_schema_tools import BIG_QUERY_TO_PYTHON_TYPES

self.assertIn("JSON", BIG_QUERY_TO_PYTHON_TYPES)
self.assertEqual(BIG_QUERY_TO_PYTHON_TYPES["JSON"], str)

def test_json_field_type_conversion(self):
"""Test bq_field_to_type function with JSON fields."""
from apache_beam.io.gcp.bigquery_schema_tools import bq_field_to_type

# Test required JSON field
result = bq_field_to_type("JSON", "REQUIRED")
self.assertEqual(result, str)

# Test nullable JSON field
result = bq_field_to_type("JSON", "NULLABLE")
self.assertEqual(result, typing.Optional[str])

# Test repeated JSON field
result = bq_field_to_type("JSON", "REPEATED")
self.assertEqual(result, typing.Sequence[str])

# Test JSON field with None mode (should default to nullable)
result = bq_field_to_type("JSON", None)
self.assertEqual(result, typing.Optional[str])

# Test JSON field with empty mode (should default to nullable)
result = bq_field_to_type("JSON", "")
self.assertEqual(result, typing.Optional[str])

def test_convert_to_usertype_with_json(self):
"""Test convert_to_usertype function with JSON fields."""
schema = bigquery.TableSchema(
fields=[
bigquery.TableFieldSchema(
name='id', type='INTEGER', mode="REQUIRED"),
bigquery.TableFieldSchema(
name='data', type='JSON', mode="NULLABLE"),
bigquery.TableFieldSchema(
name='name', type='STRING', mode="REQUIRED")
])

conversion_transform = bigquery_schema_tools.convert_to_usertype(schema)

# Verify the transform is created successfully
self.assertIsNotNone(conversion_transform)

# The transform should be a ParDo with BeamSchemaConversionDoFn
self.assertIsInstance(conversion_transform, beam.ParDo)

def test_beam_schema_conversion_dofn_with_json(self):
"""Test BeamSchemaConversionDoFn with JSON data."""
from apache_beam.io.gcp.bigquery_schema_tools import BeamSchemaConversionDoFn

# Create a user type with JSON field
fields = [
bigquery.TableFieldSchema(name='id', type='INTEGER', mode="REQUIRED"),
bigquery.TableFieldSchema(name='data', type='JSON', mode="NULLABLE")
]
schema = bigquery.TableSchema(fields=fields)
usertype = bigquery_schema_tools.generate_user_type_from_bq_schema(schema)

# Create the DoFn
dofn = BeamSchemaConversionDoFn(usertype)

# Test processing a dictionary with JSON data
input_dict = {'id': 1, 'data': '{"key": "value", "count": 42}'}

results = list(dofn.process(input_dict))
self.assertEqual(len(results), 1)

result = results[0]
self.assertEqual(result.id, 1)
self.assertEqual(result.data, '{"key": "value", "count": 42}')


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@
"NUMERIC": decimal.Decimal,
"TIMESTAMP": apache_beam.utils.timestamp.Timestamp,
"GEOGRAPHY": str,
"DATE": str,
"DATETIME": str,
"JSON": str,
}


Expand Down
9 changes: 8 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,8 @@ def test_dict_to_beam_row_repeated_nested_record(self):
class TestBeamTypehintFromSchema(unittest.TestCase):
EXPECTED_TYPEHINTS = [("str", str), ("bool", bool), ("bytes", bytes),
("int", np.int64), ("float", np.float64),
("numeric", decimal.Decimal), ("timestamp", Timestamp)]
("numeric", decimal.Decimal), ("timestamp", Timestamp),
("date", str), ("datetime", str), ("json", str)]

def get_schema_fields_with_mode(self, mode):
return [{
Expand All @@ -1033,6 +1034,12 @@ def get_schema_fields_with_mode(self, mode):
"name": "numeric", "type": "NUMERIC", "mode": mode
}, {
"name": "timestamp", "type": "TIMESTAMP", "mode": mode
}, {
"name": "date", "type": "DATE", "mode": mode
}, {
"name": "datetime", "type": "DATETIME", "mode": mode
}, {
"name": "json", "type": "JSON", "mode": mode
}]

def test_typehints_from_required_schema(self):
Expand Down
Loading