Skip to content

Commit 1d626ca

Browse files
Transactions involving different table types
1 parent 439a24c commit 1d626ca

File tree

1 file changed

+166
-0
lines changed

1 file changed

+166
-0
lines changed

pg_lake_table/tests/pytests/test_polaris_catalog_writable.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,110 @@ def test_writable_drop_table(
572572
assert metadata is not None
573573
ensure_table_dropped("test_writable_drop_table", "writable_rest_2", superuser_conn)
574574

575+
run_command("DROP SCHEMA test_writable_drop_table CASCADE", pg_conn)
576+
pg_conn.commit()
577+
578+
579+
# fetch_data_files_used
580+
def test_different_table_types(
581+
pg_conn,
582+
superuser_conn,
583+
s3,
584+
polaris_session,
585+
set_polaris_gucs,
586+
with_default_location,
587+
installcheck,
588+
create_http_helper_functions,
589+
adjust_object_store_settings,
590+
):
591+
592+
if installcheck:
593+
return
594+
595+
run_command(f"""CREATE SCHEMA test_different_table_types""", pg_conn)
596+
run_command(
597+
f"""CREATE TABLE test_different_table_types.writable_rest USING iceberg WITH (catalog='rest') AS SELECT 1 AS a""",
598+
pg_conn,
599+
)
600+
run_command(
601+
f"""CREATE TABLE test_different_table_types.writable_object_store USING iceberg WITH (catalog='object_store') AS SELECT 2 AS a""",
602+
pg_conn,
603+
)
604+
605+
run_command(
606+
f"""CREATE TABLE test_different_table_types.postgres_catalog_iceberg_test USING iceberg AS SELECT 3 as a""",
607+
pg_conn,
608+
)
609+
610+
run_command(
611+
f"""CREATE TABLE test_different_table_types.heap_test AS SELECT 4 as a""",
612+
pg_conn,
613+
)
614+
615+
pg_conn.commit()
616+
617+
run_command(
618+
f"""CREATE TABLE test_different_table_types.readable_rest() USING iceberg WITH (catalog='rest', read_only=True, catalog_table_name='writable_rest')""",
619+
pg_conn,
620+
)
621+
pg_conn.commit()
622+
623+
run_command(
624+
f"""CREATE TABLE test_different_table_types.writable_rest_2 USING iceberg WITH (catalog='rest') AS
625+
626+
SELECT * FROM test_different_table_types.writable_rest
627+
UNION ALL
628+
SELECT * FROM test_different_table_types.writable_object_store
629+
UNION ALL
630+
SELECT * FROM test_different_table_types.postgres_catalog_iceberg_test
631+
UNION ALL
632+
SELECT * FROM test_different_table_types.heap_test
633+
UNION ALL
634+
SELECT * FROM test_different_table_types.readable_rest
635+
""",
636+
pg_conn,
637+
)
638+
pg_conn.commit()
639+
640+
res = run_query(
641+
"SELECT * FROM test_different_table_types.writable_rest_2 ORDER BY a DESC",
642+
pg_conn,
643+
)
644+
assert res == [[4], [3], [2], [1], [1]]
645+
646+
run_command(
647+
"""
648+
UPDATE test_different_table_types.writable_rest_2 bar SET a = foo.a + 1
649+
FROM ( SELECT * FROM test_different_table_types.writable_rest
650+
UNION ALL
651+
SELECT * FROM test_different_table_types.writable_object_store
652+
UNION ALL
653+
SELECT * FROM test_different_table_types.postgres_catalog_iceberg_test
654+
UNION ALL
655+
SELECT * FROM test_different_table_types.heap_test
656+
UNION ALL
657+
SELECT * FROM test_different_table_types.readable_rest
658+
659+
) as foo WHERE foo.a = bar.a """,
660+
pg_conn,
661+
)
662+
663+
res = run_query(
664+
"SELECT * FROM test_different_table_types.writable_rest_2 ORDER BY a DESC",
665+
pg_conn,
666+
)
667+
assert res == [[5], [4], [3], [2], [2]]
668+
669+
pg_conn.commit()
670+
res = run_query(
671+
"SELECT * FROM test_different_table_types.writable_rest_2 ORDER BY a DESC",
672+
pg_conn,
673+
)
674+
assert res == [[5], [4], [3], [2], [2]]
675+
676+
run_command(f"""DROP SCHEMA test_different_table_types CASCADE""", pg_conn)
677+
pg_conn.commit()
678+
575679

576680
def assert_metadata_on_pg_catalog_and_rest_matches(
577681
namespace, table_name, superuser_conn
@@ -1102,3 +1206,65 @@ def grant_access_to_tables_internal(
11021206
superuser_conn,
11031207
)
11041208
superuser_conn.commit()
1209+
1210+
1211+
@pytest.fixture(scope="function")
1212+
def adjust_object_store_settings(superuser_conn):
1213+
superuser_conn.autocommit = True
1214+
1215+
# catalog=object_store requires the IcebergDefaultLocationPrefix set
1216+
# and accessible by other sessions (e.g., push catalog worker),
1217+
# and with_default_location only does a session level
1218+
run_command(
1219+
f"""ALTER SYSTEM SET pg_lake_iceberg.object_store_catalog_location_prefix = 's3://{TEST_BUCKET}';""",
1220+
superuser_conn,
1221+
)
1222+
1223+
# to be able to read the same tables that we write, use the same prefix
1224+
run_command(
1225+
f"""
1226+
ALTER SYSTEM SET pg_lake_iceberg.internal_object_store_catalog_prefix = 'tmp';
1227+
""",
1228+
superuser_conn,
1229+
)
1230+
1231+
run_command(
1232+
f"""
1233+
ALTER SYSTEM SET pg_lake_iceberg.external_object_store_catalog_prefix = 'tmp';
1234+
""",
1235+
superuser_conn,
1236+
)
1237+
1238+
superuser_conn.autocommit = False
1239+
1240+
run_command("SELECT pg_reload_conf()", superuser_conn)
1241+
1242+
# unfortunate, but Postgres requires a bit of time before
1243+
# bg workers get the reload
1244+
run_command("SELECT pg_sleep(0.1)", superuser_conn)
1245+
superuser_conn.commit()
1246+
yield
1247+
1248+
superuser_conn.autocommit = True
1249+
run_command(
1250+
f"""
1251+
ALTER SYSTEM RESET pg_lake_iceberg.object_store_catalog_location_prefix;
1252+
""",
1253+
superuser_conn,
1254+
)
1255+
run_command(
1256+
f"""
1257+
ALTER SYSTEM RESET pg_lake_iceberg.internal_object_store_catalog_prefix;
1258+
""",
1259+
superuser_conn,
1260+
)
1261+
run_command(
1262+
f"""
1263+
ALTER SYSTEM RESET pg_lake_iceberg.external_object_store_catalog_prefix;
1264+
""",
1265+
superuser_conn,
1266+
)
1267+
superuser_conn.autocommit = False
1268+
1269+
run_command("SELECT pg_reload_conf()", superuser_conn)
1270+
superuser_conn.commit()

0 commit comments

Comments
 (0)