Skip to content

Commit f5d701c

Browse files
Partition changes in REST catalog
1 parent a06c101 commit f5d701c

File tree

5 files changed

+225
-5
lines changed

5 files changed

+225
-5
lines changed

pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ typedef struct RestCatalogRequest
5656
char *createTableBody;
5757
char *addSnapshotBody;
5858
char *addSchemaBody;
59+
char *addPartitionBody;
5960
} RestCatalogRequest;
6061

6162
extern PGDLLEXPORT char *RestCatalogFetchAccessToken(void);

pg_lake_iceberg/src/iceberg/metadata_operations.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,10 +224,13 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT
224224

225225
AppendPartitionSpec(metadata, newSpec);
226226

227-
/*
228-
* TODO: Create RestCatalogRequest for updating the partitioning
229-
* in the writable rest catalog iceberg table.
230-
*/
227+
if (builder->regeneratePartitionSpec && writableRestCatalogTable)
228+
{
229+
RestCatalogRequest *request =
230+
GetAddPartitionCatalogRequest(relationId, list_make1(newSpec));
231+
232+
restCatalogRequests = lappend(restCatalogRequests, request);
233+
}
231234
}
232235
}
233236

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -885,3 +885,33 @@ GetAddSchemaCatalogRequest(Oid relationId, DataFileSchema * dataFileSchema)
885885

886886
return request;
887887
}
888+
889+
890+
/*
891+
* GetAddPartitionCatalogRequest creates a RestCatalogRequest that adds a
892+
* partition spec and sets it as the default (spec-id = -1 means "last added").
893+
*/
894+
RestCatalogRequest *
895+
GetAddPartitionCatalogRequest(Oid relationId, List *partitionSpecs)
896+
{
897+
StringInfo body = makeStringInfo();
898+
899+
/* add-spec */
900+
appendStringInfoString(body, "{\"action\":\"add-spec\",");
901+
902+
char *bodyPart = AppendIcebergPartitionSpecForRestCatalogStage(partitionSpecs);
903+
904+
appendStringInfoString(body, bodyPart);
905+
appendStringInfoChar(body, '}');
906+
907+
/* set-default-spec to the one we just added */
908+
appendStringInfoString(body, ", {\"action\":\"set-default-spec\",\"spec-id\":-1}");
909+
910+
RestCatalogRequest *request = palloc0(sizeof(RestCatalogRequest));
911+
912+
request->relationId = relationId;
913+
request->operationType = REST_CATALOG_ADD_PARTITION;
914+
request->addPartitionBody = body->data;
915+
916+
return request;
917+
}

pg_lake_table/src/transaction/track_iceberg_metadata_changes.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,10 @@ PostAllRestCatalogRequests(void)
283283
{
284284
appendStringInfoString(batchRequestBody, request->addSchemaBody);
285285
}
286+
else if (request->operationType == REST_CATALOG_ADD_PARTITION)
287+
{
288+
appendStringInfoString(batchRequestBody, request->addPartitionBody);
289+
}
286290

287291
bool lastRequest = (requestCell == list_tail(requestPerTable->tableModifyRequests));
288292

@@ -494,6 +498,11 @@ RecordRestCatalogRequestInTx(Oid relationId, RestCatalogOperationType operationT
494498
request->addSchemaBody = pstrdup(body);
495499
requestPerTable->tableModifyRequests = lappend(requestPerTable->tableModifyRequests, request);
496500
}
501+
else if (operationType == REST_CATALOG_ADD_PARTITION)
502+
{
503+
request->addPartitionBody = pstrdup(body);
504+
requestPerTable->tableModifyRequests = lappend(requestPerTable->tableModifyRequests, request);
505+
}
497506

498507
MemoryContextSwitchTo(oldContext);
499508
}
@@ -609,6 +618,9 @@ ApplyTrackedIcebergMetadataChanges(void)
609618
case REST_CATALOG_ADD_SCHEMA:
610619
body = request->addSchemaBody;
611620
break;
621+
case REST_CATALOG_ADD_PARTITION:
622+
body = request->addPartitionBody;
623+
break;
612624
default:
613625
elog(ERROR, "unsupported rest catalog operation type: %d", request->operationType);
614626
}

pg_lake_table/tests/pytests/test_polaris_catalog.py

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,14 @@ def test_writable_rest_basic_flow(
145145

146146

147147
def test_writable_rest_ddl(
148-
pg_conn, s3, polaris_session, set_polaris_gucs, with_default_location, installcheck
148+
pg_conn,
149+
s3,
150+
polaris_session,
151+
set_polaris_gucs,
152+
with_default_location,
153+
installcheck,
154+
create_http_helper_functions,
155+
superuser_conn,
149156
):
150157

151158
if installcheck:
@@ -161,6 +168,11 @@ def test_writable_rest_ddl(
161168
pg_conn,
162169
)
163170

171+
run_command(
172+
f"""CREATE TABLE test_writable_rest_ddl.writable_rest_3 USING iceberg WITH (catalog='rest', partition_by='a') AS SELECT 10000 AS a UNION SELECT 10001 as a""",
173+
pg_conn,
174+
)
175+
164176
pg_conn.commit()
165177

166178
# a DDL to a single table
@@ -181,6 +193,10 @@ def test_writable_rest_ddl(
181193
assert columns[0][0] == "a"
182194
assert columns[1][0] == "b"
183195

196+
assert_metadata_on_pg_catalog_and_rest_matches(
197+
"test_writable_rest_ddl", "writable_rest_3", superuser_conn, installcheck
198+
)
199+
184200
# multiple DDLs to a single table
185201
# a DDL to a single table
186202
run_command(
@@ -206,6 +222,20 @@ def test_writable_rest_ddl(
206222
assert columns[2][0] == "c"
207223
assert columns[3][0] == "d"
208224

225+
# run multiple partition changes on a single table
226+
run_command(
227+
"""
228+
ALTER TABLE test_writable_rest_ddl.writable_rest_3 OPTIONS (SET partition_by 'bucket(10,a)');
229+
ALTER TABLE test_writable_rest_ddl.writable_rest_3 OPTIONS (SET partition_by 'truncate(20,a)');
230+
""",
231+
pg_conn,
232+
)
233+
pg_conn.commit()
234+
235+
assert_metadata_on_pg_catalog_and_rest_matches(
236+
"test_writable_rest_ddl", "writable_rest_3", superuser_conn, installcheck
237+
)
238+
209239
# multiple DDLs to multiple tables
210240
run_command(
211241
"""
@@ -215,6 +245,9 @@ def test_writable_rest_ddl(
215245
ALTER TABLE test_writable_rest_ddl.writable_rest_2 ADD COLUMN b INT;
216246
ALTER TABLE test_writable_rest_ddl.writable_rest_2 ADD COLUMN c INT;
217247
248+
ALTER TABLE test_writable_rest_ddl.writable_rest_3 OPTIONS (SET partition_by 'truncate(30,a)');
249+
ALTER TABLE test_writable_rest_ddl.writable_rest_3 OPTIONS (SET partition_by 'truncate(40,a)');
250+
218251
""",
219252
pg_conn,
220253
)
@@ -250,15 +283,26 @@ def test_writable_rest_ddl(
250283
assert columns[1][0] == "b"
251284
assert columns[2][0] == "c"
252285

286+
assert_metadata_on_pg_catalog_and_rest_matches(
287+
"test_writable_rest_ddl", "writable_rest_3", superuser_conn, installcheck
288+
)
289+
253290
# modify table and DDL on a single table
254291
run_command(
255292
"""
256293
ALTER TABLE test_writable_rest_ddl.writable_rest ADD COLUMN g INT;
257294
INSERT INTO test_writable_rest_ddl.writable_rest (a,g) VALUES (101,101);
295+
ALTER TABLE test_writable_rest_ddl.writable_rest OPTIONS (ADD partition_by 'truncate(30,a)');
296+
258297
""",
259298
pg_conn,
260299
)
261300
pg_conn.commit()
301+
302+
assert_metadata_on_pg_catalog_and_rest_matches(
303+
"test_writable_rest_ddl", "writable_rest", superuser_conn, installcheck
304+
)
305+
262306
run_command(
263307
f"""CREATE TABLE test_writable_rest_ddl.readable_rest_5() USING iceberg WITH (catalog='rest', read_only=True, catalog_table_name='writable_rest')""",
264308
pg_conn,
@@ -1112,6 +1156,136 @@ def create_rest_catalog_table(
11121156
return iceberg_table
11131157

11141158

1159+
# TODO: extend this to cover all the metadata
1160+
# currently it only does partitions
1161+
# This is a variation of ExternalHeavyAssertsOnIcebergMetadataChange() in source code
1162+
# we cannot apply that function to REST catalog tables, as changes happen in Post-commit
1163+
1164+
1165+
def assert_metadata_on_pg_catalog_and_rest_matches(
1166+
namespace, table_name, superuser_conn, installcheck
1167+
):
1168+
1169+
metadata = get_rest_table_metadata(
1170+
namespace, table_name, superuser_conn, installcheck
1171+
)
1172+
1173+
# 1) default-spec-id check
1174+
catalog_default_spec_id = run_query(
1175+
f"select default_spec_id "
1176+
f"from lake_iceberg.tables_internal "
1177+
f"WHERE table_name = '{namespace}.{table_name}'::regclass;",
1178+
superuser_conn,
1179+
)[0][0]
1180+
1181+
metadata_default_spec_id = metadata["metadata"]["default-spec-id"]
1182+
1183+
assert catalog_default_spec_id == metadata_default_spec_id, (
1184+
f"default-spec-ids don't match: "
1185+
f"catalog={catalog_default_spec_id}, metadata={metadata_default_spec_id}"
1186+
)
1187+
1188+
# 2) partition spec id checks
1189+
specs = metadata["metadata"].get("partition-specs", [])
1190+
metadata_spec_ids = sorted(spec["spec-id"] for spec in specs)
1191+
1192+
catalog_specs_rows = run_query(
1193+
f"""
1194+
SELECT spec_id
1195+
FROM lake_table.partition_specs
1196+
WHERE table_name = '{namespace}.{table_name}'::regclass
1197+
ORDER BY spec_id
1198+
""",
1199+
superuser_conn,
1200+
)
1201+
catalog_spec_ids = [row[0] for row in catalog_specs_rows]
1202+
1203+
assert catalog_spec_ids == metadata_spec_ids, (
1204+
f"partition spec ids don't match: "
1205+
f"catalog={catalog_spec_ids}, metadata={metadata_spec_ids}"
1206+
)
1207+
1208+
# 3) partition fields check
1209+
catalog_fields_rows = run_query(
1210+
f"""
1211+
SELECT spec_id,
1212+
source_field_id,
1213+
partition_field_id,
1214+
partition_field_name,
1215+
transform_name
1216+
FROM lake_table.partition_fields
1217+
WHERE table_name = '{namespace}.{table_name}'::regclass
1218+
ORDER BY spec_id, partition_field_id
1219+
""",
1220+
superuser_conn,
1221+
)
1222+
1223+
# spec_id -> field listesi (dict)
1224+
catalog_fields_by_spec = {}
1225+
for (
1226+
spec_id,
1227+
source_field_id,
1228+
partition_field_id,
1229+
partition_field_name,
1230+
transform_name,
1231+
) in catalog_fields_rows:
1232+
catalog_fields_by_spec.setdefault(spec_id, []).append(
1233+
{
1234+
"source-id": source_field_id,
1235+
"field-id": partition_field_id,
1236+
"name": partition_field_name,
1237+
"transform": transform_name,
1238+
}
1239+
)
1240+
1241+
for spec in specs:
1242+
spec_id = spec["spec-id"]
1243+
1244+
metadata_fields = [
1245+
{
1246+
"source-id": f["source-id"],
1247+
"field-id": f["field-id"],
1248+
"name": f["name"],
1249+
"transform": f["transform"],
1250+
}
1251+
for f in spec.get("fields", [])
1252+
]
1253+
1254+
metadata_fields_sorted = sorted(metadata_fields, key=lambda f: f["field-id"])
1255+
catalog_fields_sorted = sorted(
1256+
catalog_fields_by_spec.get(spec_id, []),
1257+
key=lambda f: f["field-id"],
1258+
)
1259+
1260+
assert catalog_fields_sorted == metadata_fields_sorted, (
1261+
f"partition fields don't match for spec_id {spec_id}: "
1262+
f"catalog={catalog_fields_sorted}, metadata={metadata_fields_sorted}"
1263+
)
1264+
1265+
1266+
def get_rest_table_metadata(
1267+
encoded_namespace, encoded_table_name, pg_conn, installcheck
1268+
):
1269+
if installcheck:
1270+
return
1271+
url = f"http://{server_params.POLARIS_HOSTNAME}:{server_params.POLARIS_PORT}/api/catalog/v1/{server_params.PG_DATABASE}/namespaces/{encoded_namespace}/tables/{encoded_table_name}"
1272+
token = get_polaris_access_token()
1273+
1274+
res = run_query(
1275+
f"""
1276+
SELECT *
1277+
FROM lake_iceberg.test_http_get(
1278+
'{url}',
1279+
ARRAY['Authorization: Bearer {token}']);
1280+
""",
1281+
pg_conn,
1282+
)
1283+
assert res[0][0] == 200
1284+
status, json_str, headers = res[0]
1285+
1286+
return json.loads(json_str)
1287+
1288+
11151289
def get_namespace_location(encoded_namespace, pg_conn, installcheck):
11161290
if installcheck:
11171291
return

0 commit comments

Comments
 (0)