Skip to content

Commit 1e80069

Browse files
Finish asserts
1 parent ca9d39c commit 1e80069

File tree

2 files changed

+239
-24
lines changed

2 files changed

+239
-24
lines changed

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -852,10 +852,7 @@ GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId)
852852
appendStringInfo(body, ",\"timestamp-ms\":%ld", (long) (PostgresTimestampToIcebergTimestampMs())); /* coarse ms */
853853
appendStringInfo(body, ",\"manifest-list\":\"%s\"", newSnapshot->manifest_list);
854854
appendStringInfoString(body, ",\"summary\":{\"operation\": \"append\"}");
855-
856-
if (newSnapshot->schema_id > 0)
857-
appendStringInfo(body, ",\"schema-id\":%d", newSnapshot->schema_id);
858-
855+
appendStringInfo(body, ",\"schema-id\":%d", newSnapshot->schema_id);
859856
appendStringInfoString(body, "}}, "); /* end add-snapshot */
860857

861858
appendStringInfo(body, "{\"action\":\"set-snapshot-ref\", \"type\":\"branch\", \"ref-name\":\"main\", \"snapshot-id\":%lld}", newSnapshot->snapshot_id);

pg_lake_table/tests/pytests/test_polaris_catalog_writable.py

Lines changed: 238 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,14 @@ def test_polaris_catalog_running(pg_conn, s3, polaris_session, installcheck):
3737

3838
# fetch_data_files_used
3939
def test_writable_rest_basic_flow(
40-
pg_conn, s3, polaris_session, set_polaris_gucs, with_default_location, installcheck
40+
pg_conn,
41+
superuser_conn,
42+
s3,
43+
polaris_session,
44+
set_polaris_gucs,
45+
with_default_location,
46+
installcheck,
47+
create_http_helper_functions,
4148
):
4249

4350
if installcheck:
@@ -140,6 +147,35 @@ def test_writable_rest_basic_flow(
140147
assert len(res) == 4
141148
assert res == [[1000], [1001], [1002], [1003]]
142149

150+
# positional delete
151+
run_command(
152+
f"""
153+
INSERT INTO test_writable_rest_basic_flow.writable_rest SELECT i FROM generate_series(0,100)i;
154+
""",
155+
pg_conn,
156+
)
157+
pg_conn.commit()
158+
run_command(
159+
f"""
160+
DELETE FROM test_writable_rest_basic_flow.writable_rest WHERE a = 15;
161+
""",
162+
pg_conn,
163+
)
164+
pg_conn.commit()
165+
166+
# copy-on-write
167+
run_command(
168+
f"""
169+
UPDATE test_writable_rest_basic_flow.writable_rest SET a = a + 1 WHERE a > 10;
170+
""",
171+
pg_conn,
172+
)
173+
pg_conn.commit()
174+
175+
assert_metadata_on_pg_catalog_and_rest_matches(
176+
"test_writable_rest_basic_flow", "writable_rest", superuser_conn
177+
)
178+
143179
run_command(f"""DROP SCHEMA test_writable_rest_basic_flow CASCADE""", pg_conn)
144180
pg_conn.commit()
145181

@@ -194,7 +230,7 @@ def test_writable_rest_ddl(
194230
assert columns[1][0] == "b"
195231

196232
assert_metadata_on_pg_catalog_and_rest_matches(
197-
"test_writable_rest_ddl", "writable_rest_3", superuser_conn, installcheck
233+
"test_writable_rest_ddl", "writable_rest_3", superuser_conn
198234
)
199235

200236
# multiple DDLs to a single table
@@ -233,7 +269,7 @@ def test_writable_rest_ddl(
233269
pg_conn.commit()
234270

235271
assert_metadata_on_pg_catalog_and_rest_matches(
236-
"test_writable_rest_ddl", "writable_rest_3", superuser_conn, installcheck
272+
"test_writable_rest_ddl", "writable_rest_3", superuser_conn
237273
)
238274

239275
# multiple DDLs to multiple tables
@@ -284,7 +320,7 @@ def test_writable_rest_ddl(
284320
assert columns[2][0] == "c"
285321

286322
assert_metadata_on_pg_catalog_and_rest_matches(
287-
"test_writable_rest_ddl", "writable_rest_3", superuser_conn, installcheck
323+
"test_writable_rest_ddl", "writable_rest_3", superuser_conn
288324
)
289325

290326
# modify table and DDL on a single table
@@ -300,7 +336,7 @@ def test_writable_rest_ddl(
300336
pg_conn.commit()
301337

302338
assert_metadata_on_pg_catalog_and_rest_matches(
303-
"test_writable_rest_ddl", "writable_rest", superuser_conn, installcheck
339+
"test_writable_rest_ddl", "writable_rest", superuser_conn
304340
)
305341

306342
run_command(
@@ -366,19 +402,17 @@ def test_writable_rest_ddl(
366402
pg_conn.commit()
367403

368404

369-
# TODO: extend this to cover all the metadata
370-
# currently it only does partitions
371-
# This is a variation of ExternalHeavyAssertsOnIcebergMetadataChange() in source code
372-
# we cannot apply that function to REST catalog tables, as changes happen in Post-commit
373-
374-
375405
def assert_metadata_on_pg_catalog_and_rest_matches(
376-
namespace, table_name, superuser_conn, installcheck
406+
namespace, table_name, superuser_conn
377407
):
408+
metadata = get_rest_table_metadata(namespace, table_name, superuser_conn)
378409

379-
metadata = get_rest_table_metadata(
380-
namespace, table_name, superuser_conn, installcheck
381-
)
410+
assert_data_files_match(namespace, table_name, superuser_conn, metadata)
411+
assert_schemas_equal(namespace, table_name, superuser_conn, metadata)
412+
assert_partitions_equal(namespace, table_name, superuser_conn, metadata)
413+
414+
415+
def assert_partitions_equal(namespace, table_name, superuser_conn, metadata):
382416

383417
# 1) default-spec-id check
384418
catalog_default_spec_id = run_query(
@@ -473,11 +507,194 @@ def assert_metadata_on_pg_catalog_and_rest_matches(
473507
)
474508

475509

476-
def get_rest_table_metadata(
477-
encoded_namespace, encoded_table_name, pg_conn, installcheck
478-
):
479-
if installcheck:
480-
return
510+
def assert_schemas_equal(namespace, table_name, superuser_conn, metadata):
511+
"""
512+
Compares a list of Iceberg-like schema dicts (with 'fields') to a list of rows
513+
shaped as [id, name, required, type]. Ignores ordering and normalizes type names.
514+
"""
515+
516+
def norm_type(t: str) -> str:
517+
t = str(t).strip().lower()
518+
aliases = {
519+
# common synonyms
520+
"int": "integer",
521+
"integer": "integer",
522+
"long": "bigint",
523+
"bigint": "bigint",
524+
"short": "smallint",
525+
"smallint": "smallint",
526+
"bool": "boolean",
527+
"boolean": "boolean",
528+
"float": "float",
529+
"double": "double",
530+
"str": "string",
531+
"string": "string",
532+
"timestamp_tz": "timestamp_tz",
533+
"timestamptz": "timestamp_tz",
534+
"timestamp": "timestamp",
535+
"date": "date",
536+
"time": "time",
537+
"uuid": "uuid",
538+
"binary": "binary",
539+
"decimal": "decimal",
540+
}
541+
return aliases.get(t, t)
542+
543+
# schema checks
544+
schemas = metadata["metadata"].get("schemas", [])
545+
last_schema = [schemas[-1]]
546+
547+
catalog_schemas_rows = run_query(
548+
f"""
549+
SELECT
550+
f.field_id, a.attname, a.attnotnull, f.field_pg_type
551+
FROM
552+
lake_table.field_id_mappings f JOIN pg_attribute a ON (a.attrelid = f.table_name and a.attnum=f.pg_attnum)
553+
WHERE table_name = '{namespace}.{table_name}'::regclass
554+
""",
555+
superuser_conn,
556+
)
557+
558+
# Normalize/flatten the 'schemas' into rows
559+
schema_rows = []
560+
for s in last_schema or []:
561+
for f in s.get("fields", []):
562+
schema_rows.append(
563+
[
564+
int(f["id"]),
565+
str(f["name"]),
566+
bool(f["required"]),
567+
norm_type(f["type"]),
568+
]
569+
)
570+
571+
# Normalize the catalog rows
572+
cat_rows = []
573+
for r in catalog_schemas_rows or []:
574+
cat_rows.append(
575+
[
576+
int(r[0]),
577+
str(r[1]),
578+
bool(r[2]),
579+
norm_type(r[3]),
580+
]
581+
)
582+
583+
# Sort by (id, name) for deterministic, order-insensitive comparison
584+
schema_rows_sorted = sorted(schema_rows, key=lambda x: (x[0], x[1]))
585+
cat_rows_sorted = sorted(cat_rows, key=lambda x: (x[0], x[1]))
586+
587+
assert schema_rows_sorted == cat_rows_sorted, (
588+
"Schema mismatch.\n"
589+
f"From schemas: {schema_rows_sorted}\n"
590+
f"From catalog: {cat_rows_sorted}"
591+
)
592+
593+
594+
def assert_data_files_match(namespace, table_name, superuser_conn, metadata):
595+
596+
metadata_location = metadata["metadata-location"]
597+
598+
data_files_metadata = pg_lake_iceberg_files(superuser_conn, metadata_location)
599+
600+
data_files_pg_catalog_agg = run_query(
601+
f"""
602+
SELECT
603+
f.path,
604+
COALESCE(
605+
jsonb_object_agg(
606+
dfcs.field_id::text,
607+
to_jsonb(dfcs.lower_bound)
608+
) FILTER (WHERE dfcs.field_id IS NOT NULL),
609+
'{{}}'::jsonb
610+
) AS lower_bounds,
611+
COALESCE(
612+
jsonb_object_agg(
613+
dfcs.field_id::text,
614+
to_jsonb(dfcs.upper_bound)
615+
) FILTER (WHERE dfcs.field_id IS NOT NULL),
616+
'{{}}'::jsonb
617+
) AS upper_bounds
618+
FROM lake_table.files f
619+
LEFT JOIN lake_table.data_file_column_stats dfcs
620+
ON dfcs.table_name = f.table_name
621+
AND dfcs.path = f.path
622+
WHERE f.table_name = '{namespace}.{table_name}'::regclass
623+
GROUP BY f.path
624+
ORDER BY f.path;
625+
""",
626+
superuser_conn,
627+
)
628+
629+
def canon_json(v):
630+
# Parse stringified JSON into Python types first
631+
if not isinstance(v, (dict, list)):
632+
v = json.loads(str(v))
633+
634+
def coerce(x):
635+
# Recurse first
636+
if isinstance(x, list):
637+
return [coerce(i) for i in x]
638+
if isinstance(x, dict):
639+
return {str(k): coerce(val) for k, val in x.items()}
640+
641+
# Coerce leaf values so "100" and 100 compare equal
642+
if isinstance(x, str):
643+
s = x.strip()
644+
if s.lower() in ("true", "false"):
645+
return s.lower() == "true"
646+
try:
647+
d = Decimal(s)
648+
# Prefer ints when exact; otherwise use Decimal->float conservatively
649+
return int(d) if d == d.to_integral_value() else float(d)
650+
except InvalidOperation:
651+
return s
652+
653+
if isinstance(x, (int, float)):
654+
try:
655+
d = Decimal(str(x))
656+
return int(d) if d == d.to_integral_value() else float(d)
657+
except InvalidOperation:
658+
return x
659+
660+
# Leave booleans/None and other scalars as-is
661+
return x
662+
663+
coerced = coerce(v)
664+
return json.dumps(coerced, sort_keys=True, separators=(",", ":"))
665+
666+
# Left: from pg_lake_read_data_file_stats (ignore seq at index 1)
667+
left = sorted(
668+
(str(r[0]).strip(), canon_json(r[2]), canon_json(r[3]))
669+
for r in (data_files_metadata or [])
670+
)
671+
# Right: from the aggregated SQL above
672+
right = sorted(
673+
(str(r[0]).strip(), canon_json(r[1]), canon_json(r[2]))
674+
for r in (data_files_pg_catalog_agg or [])
675+
)
676+
677+
assert left == right, (
678+
"Data file column stats mismatch.\n"
679+
f"Only in metadata: {sorted(set(left) - set(right))[:5]}\n"
680+
f"Only in pg_catalog: {sorted(set(right) - set(left))[:5]}"
681+
)
682+
683+
684+
def pg_lake_iceberg_files(superuser_conn, metadata_location):
685+
datafile_paths = run_query(
686+
f"""
687+
SELECT * FROM lake_iceberg.data_file_stats('{metadata_location}');
688+
689+
""",
690+
superuser_conn,
691+
)
692+
693+
return datafile_paths
694+
695+
696+
def get_rest_table_metadata(encoded_namespace, encoded_table_name, pg_conn):
697+
481698
url = f"http://{server_params.POLARIS_HOSTNAME}:{server_params.POLARIS_PORT}/api/catalog/v1/{server_params.PG_DATABASE}/namespaces/{encoded_namespace}/tables/{encoded_table_name}"
482699
token = get_polaris_access_token()
483700

@@ -623,6 +840,7 @@ def create_http_helper_functions(superuser_conn, extension):
623840
DROP TYPE lake_iceberg.http_result;
624841
DROP FUNCTION IF EXISTS lake_iceberg.url_encode_path;
625842
DROP FUNCTION IF EXISTS lake_iceberg.register_namespace_to_rest_catalog;
843+
DROP FUNCTION IF EXISTS lake_iceberg.datafile_paths_from_table_metadata;
626844
""",
627845
superuser_conn,
628846
)

0 commit comments

Comments
 (0)