Skip to content

Commit c5684c6

Browse files
Do not add a partition spec if it already exists (#79)
Both Spark and Polaris follows this. Before this commit, we blindly added partition specs, even if the same spec existed before. Now, we are switching to a more proper way: if the same spec exists in the table earlier, we simply use that instead of adding one more spec. This is especially useful for #68, where Polaris throws an error if we try to send a partition spec that already exists. Here `the same spec` means a spec that has the exact same fields with an existing spec in the table. To give an example, previously we created 3 specs for the following, now 2: ```sql ALTER TABLE t OPTION (ADD partition_by='bucket(10,a)'); .. ALTER TABLE t OPTION (ADD partition_by='bucket(20,a)'); .. -- back to 10, this does not generate a new spec anymore ALTER TABLE t OPTION (ADD partition_by='bucket(10,a)'); ``` On Polaris, before this commit, we'd get the following, given Polaris thinks this spec already exists: ``` alter table t (set partition_by 'bucket(10,a)'); WARNING: HTTP request failed (HTTP 400) DETAIL: Cannot set last added spec: no spec has been added HINT: The rest catalog returned error type: ValidationException ``` Signed-off-by: Onder KALACI <[email protected]>
1 parent 1110dfb commit c5684c6

File tree

6 files changed

+315
-25
lines changed

6 files changed

+315
-25
lines changed

pg_lake_table/include/pg_lake/fdw/partition_transform.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ extern void *ApplyBucketTransformToColumn(IcebergPartitionTransform * transform,
2828
Datum columnValue, bool isNull,
2929
size_t *bucketSize);
3030
extern List *CurrentPartitionTransformList(Oid relationId);
31+
extern IcebergPartitionSpec * GetPartitionSpecIfAlreadyExist(Oid relationId, List *partitionTransforms);
3132
extern List *AllPartitionTransformList(Oid relationId);
3233
extern List *GetPartitionTransformsFromSpecFields(Oid relationId, List *specFields);
3334
extern void *DeserializePartitionValueFromPGText(IcebergPartitionTransform * transform,

pg_lake_table/include/pg_lake/partitioning/partition_spec_catalog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ typedef struct IcebergPartitionSpecHashEntry
4343
extern void UpdateDefaultPartitionSpecId(Oid relationId, int specId);
4444
extern void InsertPartitionSpecAndPartitionFields(Oid relationId, IcebergPartitionSpec * spec);
4545
extern int GetLargestSpecId(Oid relationId);
46+
extern List *GetAllIcebergPartitionSpecIds(Oid relationId);
4647
extern PGDLLEXPORT int GetCurrentSpecId(Oid relationId);
4748
extern int GetLargestPartitionFieldId(Oid relationId);
4849
extern IcebergPartitionSpecField * GetIcebergPartitionFieldFromCatalog(Oid relationId, int fieldId);

pg_lake_table/src/ddl/ddl_changes.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,15 @@ ApplyDDLCatalogChanges(Oid relationId, List *ddlOperations,
212212
List *parsedTransforms = ddlOperation->parsedTransforms;
213213
List *analyzedTransforms = AnalyzeIcebergTablePartitionBy(relationId, parsedTransforms);
214214

215+
*partitionSpec = GetPartitionSpecIfAlreadyExist(relationId, analyzedTransforms);
216+
if (*partitionSpec != NULL)
217+
{
218+
/* update lake_iceberg.internal_tables specId */
219+
UpdateDefaultPartitionSpecId(relationId, (*partitionSpec)->spec_id);
220+
221+
return;
222+
}
223+
215224
/*
216225
* creatint a new spec, get the largest spec and we'll increment
217226
* it if needed when assigning

pg_lake_table/src/fdw/partition_transform.c

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ static void ParseTransformName(const char *name, IcebergPartitionTransformType *
6363
static bool ParseBracketUintSize(const char *name, const char *prefix, size_t *outVal);
6464
static void *DatumToPartitionValue(IcebergPartitionTransform * transform, Datum columnValue, bool isNull,
6565
size_t *valuelength);
66+
static bool PartitionTransformsEqual(IcebergPartitionSpec * spec, List *partitionTransforms);
6667

6768
/*
6869
* ComputePartitionTupleForTuple applies relative partition transforms
@@ -91,6 +92,84 @@ ComputePartitionTupleForTuple(List *transforms, TupleTableSlot *slot)
9192
}
9293

9394

95+
/*
96+
* If a partition spec with the given partition transforms already exists
97+
* for the given relation, returns the partition spec. Otherwise, it
98+
* returns NULL.
99+
*/
100+
IcebergPartitionSpec *
101+
GetPartitionSpecIfAlreadyExist(Oid relationId, List *partitionTransforms)
102+
{
103+
HTAB *allSpecs = GetAllPartitionSpecsFromCatalog(relationId);
104+
105+
HASH_SEQ_STATUS currentSpecsStatus;
106+
107+
hash_seq_init(&currentSpecsStatus, allSpecs);
108+
IcebergPartitionSpecHashEntry *currentSpec = NULL;
109+
110+
while ((currentSpec = hash_seq_search(&currentSpecsStatus)) != NULL)
111+
{
112+
/*
113+
* Partition fields in spec is already ordered by partition field id,
114+
* so we can compare them one by one with the given partition
115+
* transforms.
116+
*/
117+
if (PartitionTransformsEqual(currentSpec->spec, partitionTransforms))
118+
{
119+
IcebergPartitionSpec *foundSpec = currentSpec->spec;
120+
121+
hash_seq_term(&currentSpecsStatus);
122+
123+
return foundSpec;
124+
}
125+
}
126+
127+
return NULL;
128+
}
129+
130+
/*
131+
* PartitionTransformsEqual compares the given partition spec
132+
* with the given list of partition transforms. It returns true
133+
* if they are equal, false otherwise.
134+
*/
135+
static bool
136+
PartitionTransformsEqual(IcebergPartitionSpec * spec, List *partitionTransforms)
137+
{
138+
if (spec->fields_length != list_length(partitionTransforms))
139+
return false;
140+
141+
for (int i = 0; i < spec->fields_length; i++)
142+
{
143+
IcebergPartitionSpecField *specField = &spec->fields[i];
144+
IcebergPartitionTransform *transform = list_nth(partitionTransforms, i);
145+
146+
/*
147+
* Normally, the name check should be sufficient, as we currently do
148+
* not allow renaming any column that is used in partitioning, see
149+
* ErrorIfColumnEverUsedInIcebergPartitionSpec(). Still, let's be
150+
* defensive and also check source field ids.
151+
*/
152+
if (specField->source_id != transform->sourceField->id)
153+
return false;
154+
155+
/*
156+
* This like a_bucket_10, c_year, etc., so we it includes both the
157+
* column name, the transform type and the parameters to
158+
* bucket/truncate transforms. We are essentially following what
159+
* Iceberg does here:
160+
* https://github.com/apache/iceberg/blob/8b55ac834015ce664f879ecfe1e80a941a994420/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L239-L259
161+
*/
162+
if (strcasecmp(specField->name, transform->partitionFieldName) != 0)
163+
{
164+
return false;
165+
}
166+
}
167+
168+
return true;
169+
}
170+
171+
172+
94173
/*
95174
* For a given relationId, this function returns the list of
96175
* IcebergPartitionTransform for the table with the current

pg_lake_table/tests/pytests/test_iceberg_partitions_misc.py

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,183 @@
11
from utils_pytest import *
22

33

4+
# show that re-setting the same partition spec
5+
# doesn't add a new spec, instead re-uses
6+
def test_re_set_partition_fields(
7+
extension, s3, with_default_location, pg_conn, superuser_conn
8+
):
9+
10+
run_command(f"""CREATE SCHEMA test_re_set_partition_fields;""", pg_conn)
11+
12+
run_command(
13+
"CREATE TABLE test_re_set_partition_fields.tbl(key int, value text) USING iceberg",
14+
pg_conn,
15+
)
16+
pg_conn.commit()
17+
18+
# when there are no partitions, the default spec-id is used, and there are no corresponding entries
19+
# in partition_specs
20+
default_spec_id = run_query(
21+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
22+
superuser_conn,
23+
)
24+
assert default_spec_id[0][0] == 0
25+
assert (
26+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
27+
== 0
28+
)
29+
assert (
30+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 1
31+
)
32+
33+
# now, add a spec, and make sure it is pushed
34+
run_command(
35+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (ADD partition_by 'key')",
36+
pg_conn,
37+
)
38+
pg_conn.commit()
39+
40+
default_spec_id = run_query(
41+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
42+
superuser_conn,
43+
)
44+
assert default_spec_id[0][0] == 1
45+
assert (
46+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
47+
== 1
48+
)
49+
assert (
50+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 2
51+
)
52+
53+
# now, drop the spec, and make sure it is pushed
54+
run_command(
55+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (DROP partition_by)",
56+
pg_conn,
57+
)
58+
pg_conn.commit()
59+
60+
default_spec_id = run_query(
61+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
62+
superuser_conn,
63+
)
64+
assert default_spec_id[0][0] == 0
65+
assert (
66+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
67+
== 0
68+
)
69+
assert (
70+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 2
71+
)
72+
73+
# now, add another partition spec
74+
run_command(
75+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (ADD partition_by 'bucket(10, key)')",
76+
pg_conn,
77+
)
78+
pg_conn.commit()
79+
80+
default_spec_id = run_query(
81+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
82+
superuser_conn,
83+
)
84+
assert default_spec_id[0][0] == 2
85+
assert (
86+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
87+
== 2
88+
)
89+
assert (
90+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 3
91+
)
92+
93+
# setting back to existing one should not push a new id
94+
# now, add a spec, and make sure it is pushed
95+
run_command(
96+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (SET partition_by 'key')",
97+
pg_conn,
98+
)
99+
pg_conn.commit()
100+
101+
default_spec_id = run_query(
102+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
103+
superuser_conn,
104+
)
105+
assert default_spec_id[0][0] == 1
106+
assert (
107+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
108+
== 1
109+
)
110+
assert (
111+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 3
112+
)
113+
114+
# a spec with two fields swapped are considered separate specs
115+
run_command(
116+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (SET partition_by 'truncate(100, key), bucket(50, value)')",
117+
pg_conn,
118+
)
119+
run_command(
120+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (SET partition_by 'bucket(50, value), truncate(100, key)')",
121+
pg_conn,
122+
)
123+
pg_conn.commit()
124+
125+
default_spec_id = run_query(
126+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
127+
superuser_conn,
128+
)
129+
assert default_spec_id[0][0] == 4
130+
assert (
131+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
132+
== 4
133+
)
134+
assert (
135+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 5
136+
)
137+
138+
run_command(
139+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (SET partition_by 'truncate(100, key), bucket(50, value)')",
140+
pg_conn,
141+
)
142+
pg_conn.commit()
143+
144+
default_spec_id = run_query(
145+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
146+
superuser_conn,
147+
)
148+
assert default_spec_id[0][0] == 3
149+
assert (
150+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
151+
== 3
152+
)
153+
assert (
154+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 5
155+
)
156+
157+
# finally, drop the spec and finish the test
158+
run_command(
159+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (DROP partition_by)",
160+
pg_conn,
161+
)
162+
pg_conn.commit()
163+
164+
default_spec_id = run_query(
165+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
166+
superuser_conn,
167+
)
168+
assert default_spec_id[0][0] == 0
169+
assert (
170+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
171+
== 0
172+
)
173+
assert (
174+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 5
175+
)
176+
177+
run_command(f"""DROP SCHEMA test_re_set_partition_fields CASCADE;""", pg_conn)
178+
pg_conn.commit()
179+
180+
4181
def test_ddl_with_identity_partitions(extension, s3, with_default_location, pg_conn):
5182
run_command(f"""CREATE SCHEMA test_ddl_with_partitions;""", pg_conn)
6183

@@ -455,3 +632,26 @@ def test_partition_by_with_collation(extension, s3, with_default_location, pg_co
455632

456633
run_command("DROP COLLATION s_coll;", pg_conn)
457634
pg_conn.commit()
635+
636+
637+
def table_metadata(pg_conn, table_namespace, table_name):
638+
metadata_location = run_query(
639+
f"SELECT metadata_location FROM iceberg_tables WHERE table_name = '{table_name}' and table_namespace = '{table_namespace}'",
640+
pg_conn,
641+
)[0][0]
642+
643+
pg_query = f"SELECT * FROM lake_iceberg.metadata('{metadata_location}')"
644+
645+
metadata = run_query(pg_query, pg_conn)[0][0]
646+
647+
return metadata
648+
649+
650+
def table_partition_specs(pg_conn, table_namespace, table_name):
651+
metadata = table_metadata(pg_conn, table_namespace, table_name)
652+
return metadata["partition-specs"]
653+
654+
655+
def default_table_partition_spec_id(pg_conn, table_namespace, table_name):
656+
metadata = table_metadata(pg_conn, table_namespace, table_name)
657+
return metadata["default-spec-id"]

0 commit comments

Comments
 (0)