Skip to content

Commit

Permalink
Allow configuring parquet_bloom_filter_columns in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Dec 25, 2024
1 parent c6c77cf commit bddec49
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IsolationLevel;
Expand Down Expand Up @@ -281,12 +282,14 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_LAYOUT_ENABLED_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
import static io.trino.plugin.iceberg.IcebergUtil.buildPath;
import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs;
import static io.trino.plugin.iceberg.IcebergUtil.checkFormatForProperty;
import static io.trino.plugin.iceberg.IcebergUtil.commit;
import static io.trino.plugin.iceberg.IcebergUtil.createColumnHandle;
import static io.trino.plugin.iceberg.IcebergUtil.deserializePartitionValue;
Expand Down Expand Up @@ -370,6 +373,7 @@
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
Expand All @@ -392,6 +396,7 @@ public class IcebergMetadata
.add(FORMAT_VERSION_PROPERTY)
.add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY)
.add(DATA_LOCATION_PROPERTY)
.add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY)
.add(PARTITIONING_PROPERTY)
.add(SORTED_BY_PROPERTY)
.build();
Expand Down Expand Up @@ -2273,6 +2278,21 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
extraProperties.forEach(updateProperties::set);
}

if (properties.containsKey(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY)) {
checkFormatForProperty(getFileFormat(icebergTable).toIceberg(), FileFormat.PARQUET, PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY);
//noinspection unchecked
List<String> parquetBloomFilterColumns = (List<String>) properties.get(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The parquet_bloom_filter_columns property cannot be empty"));
if (parquetBloomFilterColumns.isEmpty()) {
icebergTable.properties().keySet().stream()
.filter(key -> key.startsWith(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX))
.forEach(updateProperties::remove);
}
else {
parquetBloomFilterColumns.forEach(column -> updateProperties.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + column, "true"));
}
}

if (properties.containsKey(FILE_FORMAT_PROPERTY)) {
IcebergFileFormat fileFormat = (IcebergFileFormat) properties.get(FILE_FORMAT_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The format property cannot be empty"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ public static long getSnapshotIdAsOfTime(Table table, long epochMillis)
.snapshotId();
}

private static void checkFormatForProperty(FileFormat actualStorageFormat, FileFormat expectedStorageFormat, String propertyName)
public static void checkFormatForProperty(FileFormat actualStorageFormat, FileFormat expectedStorageFormat, String propertyName)
{
if (actualStorageFormat != expectedStorageFormat) {
throw new TrinoException(INVALID_TABLE_PROPERTY, format("Cannot specify %s table property for storage format: %s", propertyName, actualStorageFormat));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,25 @@ public void testBloomFilterPropertiesArePersistedDuringCreate()
"format = 'parquet'," +
"parquet_bloom_filter_columns = array['a','B'])");

verifyTableProperties(tableName);
}

@Test
void testBloomFilterPropertiesArePersistedDuringSetProperties()
{
String tableName = "test_metadata_write_properties_" + randomNameSuffix();
assertQuerySucceeds("CREATE TABLE " + tableName + "(A bigint, b bigint, c bigint)");

assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES parquet_bloom_filter_columns = ARRAY['a','B']");
verifyTableProperties(tableName);

assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES parquet_bloom_filter_columns = ARRAY[]");
assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
.doesNotContain("parquet_bloom_filter_columns");
}

private void verifyTableProperties(String tableName)
{
MaterializedResult actualProperties = computeActual("SELECT * FROM \"" + tableName + "$properties\"");
assertThat(actualProperties).isNotNull();
MaterializedResult expectedProperties = resultBuilder(getSession())
Expand Down

0 comments on commit bddec49

Please sign in to comment.