Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Support singular form of years, months, days, and hours functions #12117

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,24 @@ public void removeTables() {
@TestTemplate
public void testYearsFunctionOnUnpartitionedTable() {
createUnpartitionedTable(spark, tableName);
testYearsFunction(false);
testYearsFunction(false, false);
testYearsFunction(false, true);
}

@TestTemplate
public void testYearsFunctionOnPartitionedTable() {
createPartitionedTable(spark, tableName, "years(ts)");
testYearsFunction(true);
testYearsFunction(true, false);
testYearsFunction(true, true);
}

private void testYearsFunction(boolean partitioned) {
private void testYearsFunction(boolean partitioned, boolean singular) {
int targetYears = timestampStrToYearOrdinal("2017-11-22T00:00:00.000000+00:00");
String functionName = singular ? "year" : "years";
String query =
String.format(
"SELECT * FROM %s WHERE system.years(ts) = %s ORDER BY id", tableName, targetYears);
"SELECT * FROM %s WHERE system.%s(ts) = %s ORDER BY id",
tableName, functionName, targetYears);

Dataset<Row> df = spark.sql(query);
LogicalPlan optimizedPlan = df.queryExecution().optimizedPlan();
Expand All @@ -112,20 +116,24 @@ private void testYearsFunction(boolean partitioned) {
@TestTemplate
public void testMonthsFunctionOnUnpartitionedTable() {
createUnpartitionedTable(spark, tableName);
testMonthsFunction(false);
testMonthsFunction(false, false);
testMonthsFunction(false, true);
}

@TestTemplate
public void testMonthsFunctionOnPartitionedTable() {
createPartitionedTable(spark, tableName, "months(ts)");
testMonthsFunction(true);
testMonthsFunction(true, false);
testMonthsFunction(true, true);
}

private void testMonthsFunction(boolean partitioned) {
private void testMonthsFunction(boolean partitioned, boolean singular) {
int targetMonths = timestampStrToMonthOrdinal("2017-11-22T00:00:00.000000+00:00");
String functionName = singular ? "month" : "months";
String query =
String.format(
"SELECT * FROM %s WHERE system.months(ts) > %s ORDER BY id", tableName, targetMonths);
"SELECT * FROM %s WHERE system.%s(ts) > %s ORDER BY id",
tableName, functionName, targetMonths);

Dataset<Row> df = spark.sql(query);
LogicalPlan optimizedPlan = df.queryExecution().optimizedPlan();
Expand All @@ -140,22 +148,25 @@ private void testMonthsFunction(boolean partitioned) {
@TestTemplate
public void testDaysFunctionOnUnpartitionedTable() {
createUnpartitionedTable(spark, tableName);
testDaysFunction(false);
testDaysFunction(false, false);
testDaysFunction(false, true);
}

@TestTemplate
public void testDaysFunctionOnPartitionedTable() {
createPartitionedTable(spark, tableName, "days(ts)");
testDaysFunction(true);
testDaysFunction(true, false);
testDaysFunction(true, true);
}

private void testDaysFunction(boolean partitioned) {
private void testDaysFunction(boolean partitioned, boolean singular) {
String timestamp = "2018-11-20T00:00:00.000000+00:00";
int targetDays = timestampStrToDayOrdinal(timestamp);
String functionName = singular ? "day" : "days";
String query =
String.format(
"SELECT * FROM %s WHERE system.days(ts) < date('%s') ORDER BY id",
tableName, timestamp);
"SELECT * FROM %s WHERE system.%s(ts) < date('%s') ORDER BY id",
tableName, functionName, timestamp);

Dataset<Row> df = spark.sql(query);
LogicalPlan optimizedPlan = df.queryExecution().optimizedPlan();
Expand All @@ -170,20 +181,24 @@ private void testDaysFunction(boolean partitioned) {
@TestTemplate
public void testHoursFunctionOnUnpartitionedTable() {
createUnpartitionedTable(spark, tableName);
testHoursFunction(false);
testHoursFunction(false, false);
testHoursFunction(false, true);
}

@TestTemplate
public void testHoursFunctionOnPartitionedTable() {
createPartitionedTable(spark, tableName, "hours(ts)");
testHoursFunction(true);
testHoursFunction(true, false);
testHoursFunction(true, true);
}

private void testHoursFunction(boolean partitioned) {
private void testHoursFunction(boolean partitioned, boolean singular) {
int targetHours = timestampStrToHourOrdinal("2017-11-22T06:02:09.243857+00:00");
String functionName = singular ? "hour" : "hours";
String query =
String.format(
"SELECT * FROM %s WHERE system.hours(ts) >= %s ORDER BY id", tableName, targetHours);
"SELECT * FROM %s WHERE system.%s(ts) >= %s ORDER BY id",
tableName, functionName, targetHours);

Dataset<Row> df = spark.sql(query);
LogicalPlan optimizedPlan = df.queryExecution().optimizedPlan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
public class SparkV2Filters {

public static final Set<String> SUPPORTED_FUNCTIONS =
ImmutableSet.of("years", "months", "days", "hours", "bucket", "truncate");
ImmutableSet.of(
"year", "years", "month", "months", "day", "days", "hour", "hours", "bucket", "truncate");
Comment on lines -73 to +74
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't aware of the hardcoded references to "years", "months", "days" and "hours" in SparkV2Filters.
The changes here in SparkV2Filters are only necessary if we want to make the singular forms the standard.
Right now, I have left the plural forms as standard.
@rdblue should we make the singular form of the functions the standard?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @wypoon thanks for working on this. Initially, we went for the plural form because the singular form is already a function in Spark (e.g. day), do you know if these interfere? We should also make sure that we convert this in the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko thanks for looking at this, and the explanation for why the plural form was used.
There is no interference between the Iceberg functions and the Spark built-in functions, because the Iceberg functions use the "system" namespace. I added tests for the built-in functions to demonstrate this.
There is one thing that gave me pause, which is the comment in BaseCatalog::isFunctionNamespace:

    // Allow for empty namespace, as Spark's storage partitioned joins look up
    // the corresponding functions to generate transforms for partitioning
    // with an empty namespace, such as `bucket`.
    // Otherwise, use `system` namespace.

For this reason, I added some variants to TestStoragePartitionJoins to use the singular forms of the transforms. Those tests pass too.


private static final String TRUE = "ALWAYS_TRUE";
private static final String FALSE = "ALWAYS_FALSE";
Expand Down Expand Up @@ -455,12 +456,16 @@ private static UnboundTerm<Object> udfToTerm(UserDefinedScalarFunc udf) {
if (isRef(child)) {
String column = SparkUtil.toColumnName((NamedReference) child);
switch (udfName) {
case "year":
case "years":
return year(column);
case "month":
case "months":
return month(column);
case "day":
case "days":
return day(column);
case "hour":
case "hours":
return hour(column);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,29 @@
* A Spark function implementation for the Iceberg day transform.
*
* <p>Example usage: {@code SELECT system.days('source_col')}.
*
* <p>Alternate form: {@code SELECT system.day('source_col')}.
*/
public class DaysFunction extends UnaryUnboundFunction {
Copy link
Contributor Author

@wypoon wypoon Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ebyhr if I understand you correctly, you also suggest introducing a constructor that takes a String name and then have name() return this name, right?
So new DaysFunction("day") would return "day" when its name() is called, while new DaysFunction("days") would return "days".
I think that introduces more complexity than it's worth.

Copy link
Contributor Author

@wypoon wypoon Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want the function to be constructed with an arbitrary name. We really just want the name to be either "day" or "days" in this case. So we can have a constructor with a boolean.


private boolean singular;

public DaysFunction() {
this(false);
}

DaysFunction(boolean singular) {
this.singular = singular;
}

@Override
protected BoundFunction doBind(DataType valueType) {
if (valueType instanceof DateType) {
return new DateToDaysFunction();
return new DateToDaysFunction(singular);
} else if (valueType instanceof TimestampType) {
return new TimestampToDaysFunction();
return new TimestampToDaysFunction(singular);
} else if (valueType instanceof TimestampNTZType) {
return new TimestampNtzToDaysFunction();
return new TimestampNtzToDaysFunction(singular);
} else {
throw new UnsupportedOperationException(
"Expected value to be date or timestamp: " + valueType.catalogString());
Expand All @@ -57,13 +69,19 @@ public String description() {

@Override
public String name() {
return "days";
return singular ? "day" : "days";
}

private abstract static class BaseToDaysFunction extends BaseScalarFunction<Integer> {
private boolean singular;

protected BaseToDaysFunction(boolean singular) {
this.singular = singular;
}

@Override
public String name() {
return "days";
return singular ? "day" : "days";
}

@Override
Expand All @@ -74,6 +92,14 @@ public DataType resultType() {

// Spark and Iceberg internal representations of dates match so no transformation is required
public static class DateToDaysFunction extends BaseToDaysFunction {
public DateToDaysFunction() {
this(false);
}

DateToDaysFunction(boolean singular) {
super(singular);
}

// magic method used in codegen
public static int invoke(int days) {
return days;
Expand All @@ -86,7 +112,7 @@ public DataType[] inputTypes() {

@Override
public String canonicalName() {
return "iceberg.days(date)";
return "iceberg." + name() + "(date)";
}

@Override
Expand All @@ -97,6 +123,14 @@ public Integer produceResult(InternalRow input) {
}

public static class TimestampToDaysFunction extends BaseToDaysFunction {
public TimestampToDaysFunction() {
this(false);
}

TimestampToDaysFunction(boolean singular) {
super(singular);
}

// magic method used in codegen
public static int invoke(long micros) {
return DateTimeUtil.microsToDays(micros);
Expand All @@ -109,7 +143,7 @@ public DataType[] inputTypes() {

@Override
public String canonicalName() {
return "iceberg.days(timestamp)";
return "iceberg." + name() + "(timestamp)";
}

@Override
Expand All @@ -120,6 +154,14 @@ public Integer produceResult(InternalRow input) {
}

public static class TimestampNtzToDaysFunction extends BaseToDaysFunction {
public TimestampNtzToDaysFunction() {
this(false);
}

TimestampNtzToDaysFunction(boolean singular) {
super(singular);
}

// magic method used in codegen
public static int invoke(long micros) {
return DateTimeUtil.microsToDays(micros);
Expand All @@ -132,7 +174,7 @@ public DataType[] inputTypes() {

@Override
public String canonicalName() {
return "iceberg.days(timestamp_ntz)";
return "iceberg." + name() + "(timestamp_ntz)";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,27 @@
* A Spark function implementation for the Iceberg hour transform.
*
* <p>Example usage: {@code SELECT system.hours('source_col')}.
*
* <p>Alternate form: {@code SELECT system.hour('source_col')}.
*/
public class HoursFunction extends UnaryUnboundFunction {

private boolean singular;

public HoursFunction() {
this(false);
}

HoursFunction(boolean singular) {
this.singular = singular;
}

@Override
protected BoundFunction doBind(DataType valueType) {
if (valueType instanceof TimestampType) {
return new TimestampToHoursFunction();
return new TimestampToHoursFunction(singular);
} else if (valueType instanceof TimestampNTZType) {
return new TimestampNtzToHoursFunction();
return new TimestampNtzToHoursFunction(singular);
} else {
throw new UnsupportedOperationException(
"Expected value to be timestamp: " + valueType.catalogString());
Expand All @@ -54,18 +66,28 @@ public String description() {

@Override
public String name() {
return "hours";
return singular ? "hour" : "hours";
}

public static class TimestampToHoursFunction extends BaseScalarFunction<Integer> {
private boolean singular;

public TimestampToHoursFunction() {
this(false);
}

TimestampToHoursFunction(boolean singular) {
this.singular = singular;
}

// magic method used in codegen
public static int invoke(long micros) {
return DateTimeUtil.microsToHours(micros);
}

@Override
public String name() {
return "hours";
return singular ? "hour" : "hours";
}

@Override
Expand All @@ -80,7 +102,7 @@ public DataType resultType() {

@Override
public String canonicalName() {
return "iceberg.hours(timestamp)";
return "iceberg." + name() + "(timestamp)";
}

@Override
Expand All @@ -91,14 +113,24 @@ public Integer produceResult(InternalRow input) {
}

public static class TimestampNtzToHoursFunction extends BaseScalarFunction<Integer> {
private boolean singular;

public TimestampNtzToHoursFunction() {
this(false);
}

TimestampNtzToHoursFunction(boolean singular) {
this.singular = singular;
}

// magic method used in codegen
public static int invoke(long micros) {
return DateTimeUtil.microsToHours(micros);
}

@Override
public String name() {
return "hours";
return singular ? "hour" : "hours";
}

@Override
Expand All @@ -113,7 +145,7 @@ public DataType resultType() {

@Override
public String canonicalName() {
return "iceberg.hours(timestamp_ntz)";
return "iceberg." + name() + "(timestamp_ntz)";
}

@Override
Expand Down
Loading