Skip to content

Conversation

jqin61
Copy link
Contributor

@jqin61 jqin61 commented Apr 5, 2024

Background
When we are doing a static overwrite, we could choose to overwrite the full table or overwrite some partitions of the table.

Example spark sql counterpart in iceberg spark static overwrite for full table overwrite is

INSERT OVERWRITE prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE level = 'INFO'
GROUP BY uuid

And example spark sql counterpart for specified partition overwrite is

INSERT OVERWRITE prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
PARTITION (level = 'INFO')
FROM prod.my_app.logs
WHERE level = 'INFO'
GROUP BY uuid

Goal
When we overwrite the table, we could provide an expression as overwrite_filter in accord with these 2 cases. This pr is to validate that the filter expression should conform to certain rules such as it has to be on a partition column that does not use hidden partitioning and the fields in the filter have to be in accord with the input arrow table in a certain way so that the dataframe does not include values that the filter does not specify.

Rules and Test Cases

  1. Rule : The expression could only use IsNull or EqualTo as building blocks and concatenated by And
    Tests : test__validate_static_overwrite_filter_expr_type parametrize 1-8

  2. Rule : The building block predicates (IsNull and EqualTo) should not have conflicting values.
    Tests : test__validate_static_overwrite_filter_expr_type parametrize 9-11

  3. Rule : The terms (fields) should refer to existing fields in the iceberg schema, and also the literal in the predicate (if any) should match the iceberg field type. These mean the expression could be bound with table schema successfully.
    Tests :
    test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_schema_fields_in_filter
    test__bind_and_validate_static_overwrite_filter_predicate_fails_to_bind_due_to_incompatible_predicate_value

  4. Rule : If expression specifies a field which is required in iceberg schema, it should not be isnull in the expression.
    Tests : test__bind_and_validate_static_overwrite_filter_predicate_fails_to_bind_due_to_non_nullable

  5. Rule : The fields in the expression should be within partition columns
    Tests : test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_part_fields_in_filter

  6. Rule : The iceberg table fields specified in the expression could not have hidden partitioning, however, the non-specified fields could.
    Tests :
    test__bind_and_validate_static_overwrite_filter_predicate_fails_on_non_identity_transorm_filter
    test__bind_and_validate_static_overwrite_filter_predicate_succeeds_on_an_identity_transform_field_although_table_has_other_hidden_partition_fields

  7. Rule : The partition column values in the dataframe should conform to the filter. (To implement in the static overwrite function when partion keys are extracted)

Rule Necessity Justification using Spark Counterparts
To better understand these rules, let us provide spark static overwrite crash counterparts. For which, we have following set up:

# Create Spark Dataframe
from pyspark.sql.types import StructType, StructField, StringType, LongType
data_multicols = [(2, "Flamingo", "red"), (4, "Horse", "white"), (4, "Pig", "pink")]
schema = StructType([
    StructField("n_legs", LongType(), nullable=True),
    StructField("animals", StringType(), nullable=True),
    StructField("color", StringType(), nullable=True)  # Mark as non-nullable
])
df_multicols = spark.createDataFrame(data_multicols, schema)

# Create Iceberg Table
create_sql = """CREATE TABLE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols ( 
    n_legs bigint, 
    animals string,
    color string) 
USING iceberg
PARTITIONED BY (n_legs, color)

"""
spark.sql(create_sql)

# Insert Initial data
df_multicols.createOrReplaceTempView("tmp_view")
sql_cmd = f"""INSERT INTO
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    SELECT * FROM  tmp_view
    """
spark.sql(sql_cmd)

this gives such table schema:

col_name data_type comment
n_legs bigint
animals string
color string
Partitioning
Part 0 n_legs
Part 1 color

with such data:

n_legs animals color
2 Flamingo red
4 Horse white
4 Pig pink

Now let us check the rules
Rule 1. The expression could only use IsNull or EqualTo as building blocks and concatenated by And.
For example:

And(EqualTo(Reference("foo"), "hello"), And(IsNull(Reference("baz")), EqualTo(Reference("boo"), "hello")))

or
"foo = 'hello' AND (baz IS NULL AND boo = 'hello')

Spark counterpart example:

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (n_legs > 2)
    SELECT color,animals FROM  tmp_view
    """
spark.sql(sql_cmd)

gives:

mismatched input '>' expecting {')', ','}(line 3, pos 22)

== SQL ==
INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (n_legs > 2)
----------------------^^^
    SELECT color,animals FROM  tmp_view

Other predicates of 'in', '!=', etc and other expression such as 'Or' give similar errors.

**Rule 2. The building block predicates (IsNull and EqualTo) should not have conflicting values. **
This means

And(EqualTo(Reference("foo"), "hello"), EqualTo(Reference("foo"), "bye"))

and

And(EqualTo(Reference("foo"), "hello"), IsNull(Reference("foo"))

are not allowed.

However,

And(EqualTo(Reference("foo"), "hello"), EqualTo(Reference("foo"), "hello"))

is allowed and shall be deduplicated.

Spark counterpart example:

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (color='red', color='green')
    SELECT animals,n_legs FROM  tmp_view
    """
spark.sql(sql_cmd)

gives

ParseException: 
Found duplicate keys 'color'.(line 3, pos 4)

== SQL ==
INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (color='red', color='green')
----^^^
    SELECT animals,n_legs FROM  tmp_view

Rule 3. The terms (fields) should refer to existing fields in the iceberg schema, and also the literal in the predicate (if any) should match the iceberg field type. These mean the expression could be bound with table schema successfully.

Spark counterpart example:

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (not_a_field='red')
    SELECT animals,n_legs FROM  tmp_view
    """
spark.sql(sql_cmd)

gives:

AnalysisException: PARTITION clause cannot contain a non-partition column name: not_a_field

Rule 4. If expression specifies a field which is required in iceberg schema, it should not be isnull in the expression.

Spark counterpart example:

# Create Spark Dataframe with non-nullable column
from pyspark.sql.types import StructType, StructField, StringType, LongType
data_multicols = [(2, "Flamingo", "red"), (4, "Horse", "white"), (4, "Pig", "pink")]
schema = StructType([
    StructField("n_legs", LongType(), nullable=True),
    StructField("animals", StringType(), nullable=True),
    StructField("color", StringType(), nullable=False)  # Mark as non-nullable
])
df_multicols = spark.createDataFrame(data_multicols, schema)

# Create Iceberg Table with non-nullable column
create_sql = """CREATE TABLE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols ( 
    n_legs bigint, 
    animals string,
    color string not NULL) 
USING iceberg
PARTITIONED BY (n_legs, color)

"""
spark.sql(create_sql)

# Insert Initial data
df_multicols.createOrReplaceTempView("tmp_view")
sql_cmd = f"""INSERT INTO
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    SELECT * FROM  tmp_view
    """
spark.sql(sql_cmd)

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (color=null)
    SELECT animals, n_legs FROM  tmp_view
    """
spark.sql(sql_cmd)

gives:

AnalysisException: Cannot write incompatible data to table 'lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols':
- Cannot safely cast 'n_legs': string to bigint
- Cannot write nullable values to non-null column 'color'

Rule 5. The fields in the expression should be within partition columns
Spark counterpart example:

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (animals='pig')
    SELECT n_legs, color FROM  tmp_view
    """
spark.sql(sql_cmd)

gives:

AnalysisException: PARTITION clause cannot contain a non-partition column name: animals

Rule 6. The iceberg table fields specified in the expression could not have hidden partitioning, however, the non-specified fields could.

Spark counterpart example:

create_sql = """CREATE TABLE lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms ( 
    n_legs bigint, 
    animals string,
    color string
) 
USING iceberg
PARTITIONED BY (n_legs, truncate(color, 1))
"""
spark.sql(create_sql)

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms
    PARTITION (color='red')
    SELECT n_legs, animals FROM  tmp_view
    """
spark.sql(sql_cmd)

gives:

AnalysisException: PARTITION clause cannot contain a non-partition column name: color

However, if we specify the other partition column with identity transform, it works:

sql_cmd = f"""INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols_with_transforms
    PARTITION (n_legs=1)
    SELECT color, animals FROM  tmp_view
    """
spark.sql(sql_cmd)

@jqin61 jqin61 marked this pull request as draft April 5, 2024 20:57
@jqin61 jqin61 marked this pull request as ready for review April 5, 2024 21:52
@Fokko Fokko self-requested a review April 6, 2024 21:35
@Fokko
Copy link
Contributor

Fokko commented Apr 8, 2024

Hi Adrian, thanks for working on this and the very comprehensive write-up. My first questions is, what is the main goal of this PR.

Let me elaborate with more context. Looking at the Spark syntax, this originated from the Hive era, and assumes that there is a single partition spec on the table. With Iceberg, the partitioning can evolve over time and therefore also older partitioning strategies can be present.

CREATE TABLE prod.my_app.logs (
	env        STRING,
	logline    STRING,
	created_at TIMESTAMP_NTZ -- Nobody likes timezones
) 
USING iceberg
PARTITION BY (months(created_at))

-- Later on when there is more data
REPLACE PARTITION FIELD dt_month WITH days(days(created_at))

-- Or, when we want to split the logs per environment
REPLACE PARTITION FIELD dt_month WITH days(env, months(created_at))

There is a fair chance that we have to rewrite actual data. When updating the spec, a full rewrite of the table is not done directly. Otherwise it would be very costly to update the partition spec on big tables. If the data is still partitioned monthly, update data using the new partitioning (on a daily basis), it will read in the Parquet files that match the filter. Let's say that you do: INSERT OVERWRITE PARTITION (created_at='2024-04-08') it will read in data that has been written into a monthly partitioning. It will filter out the data for 2024-04-08 and write back the remaining data. The new data will be appended to the table using the new partitioning scheme.

Example spark sql counterpart in iceberg spark static overwrite for full table overwrite is

I think the top one is a dynamic overwrite since it does not explicitly calls out the partition that it will overwrite.

In that case we should compute all the affected partition values by applying the current partition spec on the dataframe. Based on that we can first delete the affected partitions, and then append the data.

def overwrite_dynamic(df: pa.Table) -> None:

The second one looks like a static overwrite:

def overwrite(df: pa.Table, expr: Union[BooleanExpression, str]) -> None:
    ...


overwrite(df, "level = 'INFO'")

I agree that this is a bit odd:

mismatched input '>' expecting {')', ','}(line 3, pos 22)

== SQL ==
INSERT OVERWRITE
    lacus.test.spark_staticoverwrite_partition_clause_and_data_reltship_multicols
    PARTITION (n_legs > 2)
----------------------^^^
    SELECT color,animals FROM  tmp_view

Open questions:

  • But in Iceberg there is no technical reason to now allow this. Do we want to refrain the user from doing this?
  • Should we add an option that will blow up when we have to rewrite files. If you do your predicates correctly, and you don't evolve the partition spec, then no parquet files should be opened and your overwrites will be crazy efficient.

@@ -1776,7 +1776,10 @@ def write_parquet(task: WriteTask) -> DataFile:
fo = io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
writer.write(pa.Table.from_batches(task.record_batches), row_group_size=row_group_size)
arrow_table = pa.Table.from_batches(task.record_batches)
# align the columns accordingly in case input arrow table has columns in order different from iceberg table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide an example of when this would happen? This only handles top-level columns.

@sungwy
Copy link
Collaborator

sungwy commented Apr 8, 2024

Hi @Fokko @adrianqin I think the goal of this PR is to create a distinction to the semantic of a 'static overwrite' onto a partitioned table, from that of a 'delete' + 'append'.

As we discussed in the last PyIceberg Sync, if we believe that a partitioned table overwrite should maintain the expectation of comparing the values provided in the overwrite_filter, to the values provided in the arrow table, and hence I think we'd need these validations. We would first need to run these validations on the predicate expression provided in the overwrite_filter so that we can then compare the values in the arrow table.

In the community sync, we discussed whether the community was in favor of drawing a distinction between delete + append, versus overwrite, and I think the we all gravitated somewhat towards the idea of having a validation of predicate expression and the table values in partitioned table overwrites.

For example, where dt and level are both partition columns:

overwrite_filter = "level = 'INFO' AND dt = '2024-03-25'"

# expected behavior -> deletes partition level = 'INFO' and dt = '2024-03-26'

df = pa.Table.from_pylist(
   [
       {"level": "INFO", "msg": "hi", "dt": date(2024, 3, 26)},
       {"level": "ERROR", "msg": "bye", "dt": date(2024, 3, 26)},
   ],
)

tbl.overwrite(df, overwrite_filter)

If we wanted to handle the validation only in the delete function by checking if we would end up rewriting files, above pattern would succeed by deleting level = 'INFO' and dt = '2024-02-01' because this deletion is a pure metadata operations. And then we would add new data files for level = 'INFO' and dt = '2024-03-26' & level = 'ERROR' and dt = '2024-03-26'.

Static overwrite on the other hand, would eagerly validate the predicate expression against the table schema, and the values in the arrow table and throw instead.

Please let me know what you think - I think figuring out how static overwrite semantically differs from delete + append (or not) would be crucial in understanding how we want to shape up this API

@Fokko
Copy link
Contributor

Fokko commented Apr 8, 2024

If we wanted to handle the validation only in the delete function by checking if we would end up rewriting files, above pattern would succeed by deleting level = 'INFO' and dt = '2024-02-01' because this deletion is a pure metadata operations.

This is what I tried to explain in the comment earlier above: sometimes you have to do rewrites because there was a different partitioning strategy before where still some rows match. I'm adding that currently in #569. A table can have older manifests that are still written using an older partition spec.

Static overwrite on the other hand, would eagerly validate the predicate expression against the table schema, and the values in the arrow table and throw instead.

I missed this part. We can add it, but I would say that it is up to the user. To simplify it, this means doing this additional check (pseudocode):

def overwrite(df: pa.Table, overwrite_filter: Union[str, BooleanExpression]) -> None:
    row_filter = _parse_row_filter(row_filter) # Turns the str into a boolean expression    
    pa_row_filter = expression_to_pyarrow(row_filter)
    num_invalid_rows = len(df) - df.filter(pa_row_filter)
    if len(num_invalid_rows) > 0:
        raise ValueError(f"Found {num_invalid_rows} rows that don't match the overwrite predicate")

@jqin61
Copy link
Contributor Author

jqin61 commented Apr 9, 2024

A delete + append will do the static overwrite and most of the validation will be done by delete itself. Closing the pr and will open a static overwrite PR when DELETE and MERGE_APPEND are ready with the following two flags:

  1. allow_rewrite_datafiles - flag is propagated to 'delete' and an exception is thrown if datafile rewrites are needed
  2. validate_input_data - input_data is validated as suggested: Validate overwrite filter #582 (comment)

@jqin61 jqin61 closed this Apr 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants