Skip to content

Commit 358bff7

Browse files
Do not add a partition spec if it already exists
Both Spark and Polaris follows this. Before this commit, we blindly added partition specs, even if the same spec existed before. Now, we are switching to a more proper way: if the same spec exists in the table earlier, we simply use that instead of adding one more spec. This is especially useful for #68, where Polaris throws an error if we try to send a partition spec that already exists. Here `the same spec` means a spec that has the exact same fields with an existing spec in the table. To give an example, previously we created 3 specs for the following, now 2: ```sql ALTER TABLE t OPTION (ADD partition_by='bucket(10,a)'); .. ALTER TABLE t OPTION (ADD partition_by='bucket(20,a)'); .. -- back to 10, this does not generate a new spec anymore ALTER TABLE t OPTION (ADD partition_by='bucket(10,a)'); ``` On Polaris, before this commit, we'd get the following, given Polaris thinks this spec already exists: ``` alter table t (set partition_by 'bucket(10,a)'); WARNING: HTTP request failed (HTTP 400) DETAIL: Cannot set last added spec: no spec has been added HINT: The rest catalog returned error type: ValidationException ``` Signed-off-by: Onder KALACI <[email protected]> (cherry picked from commit 75b4eda)
1 parent 1e80069 commit 358bff7

File tree

6 files changed

+308
-25
lines changed

6 files changed

+308
-25
lines changed

pg_lake_table/include/pg_lake/fdw/partition_transform.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ extern void *ApplyBucketTransformToColumn(IcebergPartitionTransform * transform,
2828
Datum columnValue, bool isNull,
2929
size_t *bucketSize);
3030
extern List *CurrentPartitionTransformList(Oid relationId);
31+
extern IcebergPartitionSpec * PartitionAlreadyExistsInTable(Oid relationId, List *partitionTransforms);
3132
extern List *AllPartitionTransformList(Oid relationId);
3233
extern List *GetPartitionTransformsFromSpecFields(Oid relationId, List *specFields);
3334
extern void *DeserializePartitionValueFromPGText(IcebergPartitionTransform * transform,

pg_lake_table/include/pg_lake/partitioning/partition_spec_catalog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ typedef struct IcebergPartitionSpecHashEntry
4343
extern void UpdateDefaultPartitionSpecId(Oid relationId, int specId);
4444
extern void InsertPartitionSpecAndPartitionFields(Oid relationId, IcebergPartitionSpec * spec);
4545
extern int GetLargestSpecId(Oid relationId);
46+
extern List *GetAllIcebergPartitionSpecIds(Oid relationId);
4647
extern PGDLLEXPORT int GetCurrentSpecId(Oid relationId);
4748
extern int GetLargestPartitionFieldId(Oid relationId);
4849
extern IcebergPartitionSpecField * GetIcebergPartitionFieldFromCatalog(Oid relationId, int fieldId);

pg_lake_table/src/ddl/ddl_changes.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,15 @@ ApplyDDLCatalogChanges(Oid relationId, List *ddlOperations,
219219
List *parsedTransforms = ddlOperation->parsedTransforms;
220220
List *analyzedTransforms = AnalyzeIcebergTablePartitionBy(relationId, parsedTransforms);
221221

222+
*partitionSpec = PartitionAlreadyExistsInTable(relationId, analyzedTransforms);
223+
if (*partitionSpec != NULL)
224+
{
225+
/* update lake_iceberg.internal_tables specId */
226+
UpdateDefaultPartitionSpecId(relationId, (*partitionSpec)->spec_id);
227+
228+
return;
229+
}
230+
222231
/*
223232
* creatint a new spec, get the largest spec and we'll increment
224233
* it if needed when assigning

pg_lake_table/src/fdw/partition_transform.c

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ static void ParseTransformName(const char *name, IcebergPartitionTransformType *
6363
static bool ParseBracketUintSize(const char *name, const char *prefix, size_t *outVal);
6464
static void *DatumToPartitionValue(IcebergPartitionTransform * transform, Datum columnValue, bool isNull,
6565
size_t *valuelength);
66+
static bool PartitionTransformsEqual(IcebergPartitionSpec * spec, List *partitionTransforms);
6667

6768
/*
6869
* ComputePartitionTupleForTuple applies relative partition transforms
@@ -91,6 +92,77 @@ ComputePartitionTupleForTuple(List *transforms, TupleTableSlot *slot)
9192
}
9293

9394

95+
/*
96+
* If a partition spec with the given partition transforms already exists
97+
* for the given relation, returns the partition spec. Otherwise, it
98+
* returns -1.
99+
*/
100+
IcebergPartitionSpec *
101+
PartitionAlreadyExistsInTable(Oid relationId, List *partitionTransforms)
102+
{
103+
HTAB *allSpecs = GetAllPartitionSpecsFromCatalog(relationId);
104+
105+
HASH_SEQ_STATUS currentSpecsStatus;
106+
107+
hash_seq_init(&currentSpecsStatus, allSpecs);
108+
IcebergPartitionSpecHashEntry *currentSpec = NULL;
109+
110+
while ((currentSpec = hash_seq_search(&currentSpecsStatus)) != NULL)
111+
{
112+
if (PartitionTransformsEqual(currentSpec->spec, partitionTransforms))
113+
{
114+
IcebergPartitionSpec *foundSpec = currentSpec->spec;
115+
116+
hash_seq_term(&currentSpecsStatus);
117+
118+
return foundSpec;
119+
}
120+
}
121+
122+
return NULL;
123+
}
124+
125+
/*
126+
* PartitionTransformsEqual compares the given partition spec
127+
* with the given list of partition transforms. It returns true
128+
* if they are equal, false otherwise.
129+
*/
130+
static bool
131+
PartitionTransformsEqual(IcebergPartitionSpec * spec, List *partitionTransforms)
132+
{
133+
if (spec->fields_length != list_length(partitionTransforms))
134+
return false;
135+
136+
for (int i = 0; i < spec->fields_length; i++)
137+
{
138+
IcebergPartitionSpecField *specField = &spec->fields[i];
139+
IcebergPartitionTransform *transform = list_nth(partitionTransforms, i);
140+
141+
/*
142+
* Normally, the name check should be sufficient, as we currently do
143+
* not allow renaming any column that is used in partitioning, see
144+
* ErrorIfColumnEverUsedInIcebergPartitionSpec(). Still, let's be
145+
* defensive and also check source field ids.
146+
*/
147+
if (specField->source_id != transform->sourceField->id)
148+
return false;
149+
150+
/*
151+
* This like a_bucket_10, c_year, etc., so we it includes both the
152+
* column name, the transform type and the parameters to
153+
* bucket/truncate transforms.
154+
*/
155+
if (strcasecmp(specField->name, transform->partitionFieldName) != 0)
156+
{
157+
return false;
158+
}
159+
}
160+
161+
return true;
162+
}
163+
164+
165+
94166
/*
95167
* For a given relationId, this function returns the list of
96168
* IcebergPartitionTransform for the table with the current

pg_lake_table/tests/pytests/test_iceberg_partitions_misc.py

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,183 @@
11
from utils_pytest import *
22

33

4+
# show that re-setting the same partition spec
5+
# doesn't add a new spec, instead re-uses
6+
def test_re_set_partition_fields(
7+
extension, s3, with_default_location, pg_conn, superuser_conn
8+
):
9+
10+
run_command(f"""CREATE SCHEMA test_re_set_partition_fields;""", pg_conn)
11+
12+
run_command(
13+
"CREATE TABLE test_re_set_partition_fields.tbl(key int, value text) USING iceberg",
14+
pg_conn,
15+
)
16+
pg_conn.commit()
17+
18+
# when there are no partitions, the default spec-id is used, and there are no corresponding entries
19+
# in partition_specs
20+
default_spec_id = run_query(
21+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
22+
superuser_conn,
23+
)
24+
assert default_spec_id[0][0] == 0
25+
assert (
26+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
27+
== 0
28+
)
29+
assert (
30+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 1
31+
)
32+
33+
# now, add a spec, and make sure it is pushed
34+
run_command(
35+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (ADD partition_by 'key')",
36+
pg_conn,
37+
)
38+
pg_conn.commit()
39+
40+
default_spec_id = run_query(
41+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
42+
superuser_conn,
43+
)
44+
assert default_spec_id[0][0] == 1
45+
assert (
46+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
47+
== 1
48+
)
49+
assert (
50+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 2
51+
)
52+
53+
# now, drop the spec, and make sure it is pushed
54+
run_command(
55+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (DROP partition_by)",
56+
pg_conn,
57+
)
58+
pg_conn.commit()
59+
60+
default_spec_id = run_query(
61+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
62+
superuser_conn,
63+
)
64+
assert default_spec_id[0][0] == 0
65+
assert (
66+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
67+
== 0
68+
)
69+
assert (
70+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 2
71+
)
72+
73+
# now, add another partition spec
74+
run_command(
75+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (ADD partition_by 'bucket(10, key)')",
76+
pg_conn,
77+
)
78+
pg_conn.commit()
79+
80+
default_spec_id = run_query(
81+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
82+
superuser_conn,
83+
)
84+
assert default_spec_id[0][0] == 2
85+
assert (
86+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
87+
== 2
88+
)
89+
assert (
90+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 3
91+
)
92+
93+
# setting back to existing one should not push a new id
94+
# now, add a spec, and make sure it is pushed
95+
run_command(
96+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (SET partition_by 'key')",
97+
pg_conn,
98+
)
99+
pg_conn.commit()
100+
101+
default_spec_id = run_query(
102+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
103+
superuser_conn,
104+
)
105+
assert default_spec_id[0][0] == 1
106+
assert (
107+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
108+
== 1
109+
)
110+
assert (
111+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 3
112+
)
113+
114+
# a spec with two fields swapped are considered separate specs
115+
run_command(
116+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (SET partition_by 'truncate(100, key), bucket(50, value)')",
117+
pg_conn,
118+
)
119+
run_command(
120+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (SET partition_by 'bucket(50, value), truncate(100, key)')",
121+
pg_conn,
122+
)
123+
pg_conn.commit()
124+
125+
default_spec_id = run_query(
126+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
127+
superuser_conn,
128+
)
129+
assert default_spec_id[0][0] == 4
130+
assert (
131+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
132+
== 4
133+
)
134+
assert (
135+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 5
136+
)
137+
138+
run_command(
139+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (SET partition_by 'truncate(100, key), bucket(50, value)')",
140+
pg_conn,
141+
)
142+
pg_conn.commit()
143+
144+
default_spec_id = run_query(
145+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
146+
superuser_conn,
147+
)
148+
assert default_spec_id[0][0] == 3
149+
assert (
150+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
151+
== 3
152+
)
153+
assert (
154+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 5
155+
)
156+
157+
# finally, drop the spec and finish the test
158+
run_command(
159+
f"ALTER TABLE test_re_set_partition_fields.tbl OPTIONS (DROP partition_by)",
160+
pg_conn,
161+
)
162+
pg_conn.commit()
163+
164+
default_spec_id = run_query(
165+
"SELECT default_spec_id FROM lake_iceberg.tables_internal WHERE table_name = 'test_re_set_partition_fields.tbl'::regclass",
166+
superuser_conn,
167+
)
168+
assert default_spec_id[0][0] == 0
169+
assert (
170+
default_table_partition_spec_id(pg_conn, "test_re_set_partition_fields", "tbl")
171+
== 0
172+
)
173+
assert (
174+
len(table_partition_specs(pg_conn, "test_re_set_partition_fields", "tbl")) == 5
175+
)
176+
177+
run_command(f"""DROP SCHEMA test_re_set_partition_fields CASCADE;""", pg_conn)
178+
pg_conn.commit()
179+
180+
4181
def test_ddl_with_identity_partitions(extension, s3, with_default_location, pg_conn):
5182
run_command(f"""CREATE SCHEMA test_ddl_with_partitions;""", pg_conn)
6183

@@ -455,3 +632,26 @@ def test_partition_by_with_collation(extension, s3, with_default_location, pg_co
455632

456633
run_command("DROP COLLATION s_coll;", pg_conn)
457634
pg_conn.commit()
635+
636+
637+
def table_metadata(pg_conn, table_namespace, table_name):
638+
metadata_location = run_query(
639+
f"SELECT metadata_location FROM iceberg_tables WHERE table_name = '{table_name}' and table_namespace = '{table_namespace}'",
640+
pg_conn,
641+
)[0][0]
642+
643+
pg_query = f"SELECT * FROM lake_iceberg.metadata('{metadata_location}')"
644+
645+
metadata = run_query(pg_query, pg_conn)[0][0]
646+
647+
return metadata
648+
649+
650+
def table_partition_specs(pg_conn, table_namespace, table_name):
651+
metadata = table_metadata(pg_conn, table_namespace, table_name)
652+
return metadata["partition-specs"]
653+
654+
655+
def default_table_partition_spec_id(pg_conn, table_namespace, table_name):
656+
metadata = table_metadata(pg_conn, table_namespace, table_name)
657+
return metadata["default-spec-id"]

0 commit comments

Comments
 (0)