Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

### Snowpark Python API Updates

#### New Features

#### Bug Fixes

- Fixed a bug in `Session.client_telemetry` that trace does not have snowflake style trace id.
- Fixed a bug when saving a fdn table into an iceberg table in overwrite mode, error is raised because `StringType` is saved in wrong length.

## 1.47.0 (TBD)

Expand Down
6 changes: 4 additions & 2 deletions src/snowflake/snowpark/_internal/analyzer/analyzer_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1994,11 +1994,13 @@ def drop_table_if_exists_statement(table_name: str) -> str:
return DROP + TABLE + IF + EXISTS + table_name


def attribute_to_schema_string(attributes: List[Attribute]) -> str:
def attribute_to_schema_string(
attributes: List[Attribute], is_iceberg: Optional[bool] = False
) -> str:
return COMMA.join(
attr.name
+ SPACE
+ convert_sp_to_sf_type(attr.datatype, attr.nullable)
+ convert_sp_to_sf_type(attr.datatype, attr.nullable, is_iceberg=is_iceberg)
+ (NOT_NULL if not attr.nullable else EMPTY_STRING)
for attr in attributes
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1336,9 +1336,10 @@ def save_as_table(
# in save as table. So we rename ${number} with COL{number}.
hidden_column_pattern = r"\"\$(\d+)\""
column_definition = None
is_iceberg = True if iceberg_config is not None else False
if child_attributes is not None:
column_definition_with_hidden_columns = attribute_to_schema_string(
child_attributes or []
child_attributes or [], is_iceberg=is_iceberg
)
column_definition = re.sub(
hidden_column_pattern,
Expand Down
8 changes: 7 additions & 1 deletion src/snowflake/snowpark/_internal/type_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@
except ImportError:
ResultMetadataV2 = ResultMetadata

_MAX_ICEBERG_STRING_SIZE = 134217728


def convert_metadata_to_sp_type(
metadata: Union[ResultMetadata, "ResultMetadataV2"],
Expand Down Expand Up @@ -318,7 +320,9 @@ def convert_sf_to_sp_type(
)


def convert_sp_to_sf_type(datatype: DataType, nullable_override=None) -> str:
def convert_sp_to_sf_type(
datatype: DataType, nullable_override=None, is_iceberg: Optional[bool] = False
) -> str:
if context._is_snowpark_connect_compatible_mode:
if isinstance(datatype, _IntegralType) and datatype._precision is not None:
return f"NUMBER({datatype._precision}, 0)"
Expand All @@ -341,6 +345,8 @@ def convert_sp_to_sf_type(datatype: DataType, nullable_override=None) -> str:
# We regard NullType as String, which is required when creating
# a dataframe from local data with all None values
if isinstance(datatype, StringType):
if is_iceberg:
return f"STRING({_MAX_ICEBERG_STRING_SIZE})"
if datatype.length:
return f"STRING({datatype.length})"
return "STRING"
Expand Down
14 changes: 0 additions & 14 deletions tests/integ/scala/test_dataframe_writer_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,6 @@ def test_iceberg(session, local_testing_mode):
if not iceberg_supported(session, local_testing_mode) or is_in_stored_procedure():
pytest.skip("Test requires iceberg support.")

session.sql(
"alter session set FEATURE_INCREASED_MAX_LOB_SIZE_PERSISTED=DISABLED"
).collect()
session.sql(
"alter session set FEATURE_INCREASED_MAX_LOB_SIZE_IN_MEMORY=DISABLED"
).collect()

table_name = Utils.random_table_name()
df = session.create_dataframe(
[],
Expand Down Expand Up @@ -280,13 +273,6 @@ def test_iceberg_partition_by(session, local_testing_mode):
if not iceberg_supported(session, local_testing_mode) or is_in_stored_procedure():
pytest.skip("Test requires iceberg support.")

session.sql(
"alter session set FEATURE_INCREASED_MAX_LOB_SIZE_PERSISTED=DISABLED"
).collect()
session.sql(
"alter session set FEATURE_INCREASED_MAX_LOB_SIZE_IN_MEMORY=DISABLED"
).collect()

df = session.create_dataframe(
[],
schema=StructType(
Expand Down
70 changes: 70 additions & 0 deletions tests/integ/scala/test_datatype_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,76 @@ def test_iceberg_nested_fields(
Utils.drop_table(structured_type_session, transformed_table_name)


@pytest.mark.skipif(
"config.getoption('local_testing_mode', default=False)",
reason="local testing does not fully support iceberg tables yet.",
)
def test_iceberg_string_columns_use_max_size(session, local_testing_mode):
if not iceberg_supported(session, local_testing_mode):
pytest.skip("Test requires iceberg support.")

fdn_src_table = Utils.random_table_name()
iceberg_dest_table = Utils.random_table_name()
base_location = (
f"{ICEBERG_CONFIG['base_location']}/ice_varchar_repro_{uuid.uuid4().hex}"
)
iceberg_config = dict(ICEBERG_CONFIG)
iceberg_config["base_location"] = base_location

try:
# 1) Create destination Iceberg table.
session.sql(
f"""
CREATE OR REPLACE ICEBERG TABLE {iceberg_dest_table} (
NAME STRING, AGE INTEGER
)
CATALOG = '{ICEBERG_CONFIG["catalog"]}'
EXTERNAL_VOLUME = '{ICEBERG_CONFIG["external_volume"]}'
BASE_LOCATION = '{base_location}';
"""
).collect()

# 2) Create regular (FDN) source table.
session.sql(
f"CREATE OR REPLACE TABLE {fdn_src_table} (NAME VARCHAR, AGE INTEGER)"
).collect()
session.sql(f"INSERT INTO {fdn_src_table} VALUES ('Bob', 25)").collect()

# 3) Read from normal table: VARCHAR maps to StringType(length=16777216).
fdn_df = session.table(fdn_src_table)
name_field = next(f for f in fdn_df.schema.fields if f.name.upper() == "NAME")
assert isinstance(name_field.datatype, StringType)
assert name_field.datatype.length == MAX_TABLE_STRING_SIZE

# 4) Overwrite Iceberg table using iceberg_config: STRING should be max-sized.
with session.query_history() as query_history:
fdn_df.write.mode("overwrite").save_as_table(
iceberg_dest_table, column_order="name", iceberg_config=iceberg_config
)

normalized_sqls = [
Utils.normalize_sql(q.sql_text).upper() for q in query_history.queries
]
create_iceberg_sqls = [
s for s in normalized_sqls if "CREATE" in s and "ICEBERG TABLE" in s
]
assert create_iceberg_sqls, (
"Expected Snowpark to issue a CREATE ... ICEBERG TABLE during overwrite, "
f"but none was found. Queries were: {normalized_sqls}"
)
assert any(
("NAME STRING(134217728)" in s or '"NAME" STRING(134217728)' in s)
for s in create_iceberg_sqls
), (
"Expected Iceberg CREATE TABLE to use max string size 134217728 for NAME, "
f"but got: {create_iceberg_sqls}"
)
assert not any("NAME STRING(16777216)" in s for s in create_iceberg_sqls)
finally:
Utils.drop_table(session, fdn_src_table)
Utils.drop_table(session, iceberg_dest_table)


@pytest.mark.xfail(
"config.getoption('local_testing_mode', default=False)",
reason="local testing does not fully support structured types yet.",
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,8 @@ def test_convert_sp_to_sf_type():
assert convert_sp_to_sf_type(DoubleType()) == "DOUBLE"
assert convert_sp_to_sf_type(StringType()) == "STRING"
assert convert_sp_to_sf_type(StringType(77)) == "STRING(77)"
assert convert_sp_to_sf_type(StringType(), is_iceberg=True) == "STRING(134217728)"
assert convert_sp_to_sf_type(StringType(77), is_iceberg=True) == "STRING(134217728)"
assert convert_sp_to_sf_type(NullType()) == "STRING"
assert convert_sp_to_sf_type(BooleanType()) == "BOOLEAN"
assert convert_sp_to_sf_type(DateType()) == "DATE"
Expand Down
Loading