From 8e5753bed7c9ffb7a9072fd9a1533f5170564395 Mon Sep 17 00:00:00 2001 From: Onder KALACI Date: Mon, 3 Nov 2025 16:23:37 +0300 Subject: [PATCH 1/6] Bump polaris version from 1.1 to 1.2 We have Polaris running as part of regression tests. Polaris 1.2 is released, and let's try to be up-to-date with Polaris. There is one relatively important change that we are impacted by: ``` With version 1.2, creating or altering a namespace with a custom location outside its parent location is now prohibited by default. To restore the old behavior, you can set the ALLOW_NAMESPACE_CUSTOM_LOCATION flag to true. ``` We used to set the namepsace location by ourselves. Instead, let's rely on Polaris, which by default uses: ``` /// ``` And, that's what we look for anyway. The `warehouse` is `database name` for our purposes. --- .../src/rest_catalog/rest_catalog.c | 26 ++++----- .../tests/pytests/test_polaris_catalog.py | 58 ++----------------- test_common/rest_catalog/Makefile | 6 +- test_common/utils_pytest.py | 17 +++--- 4 files changed, 27 insertions(+), 80 deletions(-) diff --git a/pg_lake_iceberg/src/rest_catalog/rest_catalog.c b/pg_lake_iceberg/src/rest_catalog/rest_catalog.c index 90946b48..36ff23da 100644 --- a/pg_lake_iceberg/src/rest_catalog/rest_catalog.c +++ b/pg_lake_iceberg/src/rest_catalog/rest_catalog.c @@ -41,7 +41,7 @@ char *RestCatalogClientId = NULL; char *RestCatalogClientSecret = NULL; -static void CreateNamespaceOnRestCatalog(const char *catalogName, const char *namespaceName, const char *allowedLocation); +static void CreateNamespaceOnRestCatalog(const char *catalogName, const char *namespaceName); static char *EncodeBasicAuth(const char *clientId, const char *clientSecret); static char *JsonbGetStringByPath(const char *jsonb_text, int nkeys,...); static char *GetRestCatalogName(Oid relationId); @@ -60,11 +60,6 @@ static void ReportHTTPError(HttpResult httpResult, int level); void RegisterNamespaceToRestCatalog(const char *catalogName, const char *namespaceName, bool hasRestCatalogReadOnlyOption) { - /* Rest catalog restricts writing to this location for the given namespace */ - const char *allowedLocation = - psprintf("%s/%s/%s", IcebergDefaultLocationPrefix, URLEncodePath(catalogName), - URLEncodePath(namespaceName)); - /* * First, we need to check if the namespace already exists in Rest Catalog * via a GET request. @@ -88,7 +83,7 @@ RegisterNamespaceToRestCatalog(const char *catalogName, const char *namespaceNam /* * Does not exists, we'll create it. */ - CreateNamespaceOnRestCatalog(catalogName, namespaceName, allowedLocation); + CreateNamespaceOnRestCatalog(catalogName, namespaceName); break; } @@ -97,28 +92,32 @@ RegisterNamespaceToRestCatalog(const char *catalogName, const char *namespaceNam { /* * Verify allowed location matches, otherwise raise an error. - * We raise error because we use the location as the place - * where tables are stored. So, we cannot afford to have + * We raise error because we use the default location as the + * place where tables are stored. So, we cannot afford to have * different locations for the same namespace. */ char *serverAllowedLocation = JsonbGetStringByPath(httpResult.body, 2, "properties", "location"); + const char *defaultAllowedLocation = + psprintf("%s/%s/%s", IcebergDefaultLocationPrefix, catalogName, namespaceName); + + /* * Compare by ignoring the trailing `/` char that the server * might have for internal iceberg tables. For external ones, * we don't have any control over. */ if (!hasRestCatalogReadOnlyOption && - (strlen(serverAllowedLocation) - strlen(allowedLocation) > 1 || - strncmp(serverAllowedLocation, allowedLocation, strlen(allowedLocation)) != 0)) + (strlen(serverAllowedLocation) - strlen(defaultAllowedLocation) > 1 || + strncmp(serverAllowedLocation, defaultAllowedLocation, strlen(defaultAllowedLocation)) != 0)) { ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), errmsg("namespace \"%s\" is already registered with a different location", namespaceName), errdetail_internal("Expected location: %s, but got: %s", - allowedLocation, serverAllowedLocation))); + defaultAllowedLocation, serverAllowedLocation))); } break; @@ -221,7 +220,7 @@ GetMetadataLocationFromRestCatalog(const char *restCatalogName, const char *name * an error is raised. */ static void -CreateNamespaceOnRestCatalog(const char *catalogName, const char *namespaceName, const char *allowedLocation) +CreateNamespaceOnRestCatalog(const char *catalogName, const char *namespaceName) { /* POST create */ StringInfoData body; @@ -240,7 +239,6 @@ CreateNamespaceOnRestCatalog(const char *catalogName, const char *namespaceName, appendJsonKey(&body, "properties"); appendStringInfoChar(&body, '{'); /* start properties object */ - appendJsonString(&body, "location", allowedLocation); appendStringInfoChar(&body, '}'); /* close properties object */ appendStringInfoChar(&body, '}'); /* close body */ diff --git a/pg_lake_table/tests/pytests/test_polaris_catalog.py b/pg_lake_table/tests/pytests/test_polaris_catalog.py index 46d47f79..c88e5314 100644 --- a/pg_lake_table/tests/pytests/test_polaris_catalog.py +++ b/pg_lake_table/tests/pytests/test_polaris_catalog.py @@ -74,7 +74,7 @@ def test_create_namespace( assert ( location.lower() - == f"s3://testbucketcdw/{server_params.PG_DATABASE}/{encoded_namespace}/".lower() + == f"s3://testbucketcdw/{server_params.PG_DATABASE}/{namespace}/".lower() ) res = run_command( @@ -150,7 +150,7 @@ def test_create_namespace_in_tx( assert ( location.lower() - == f"s3://testbucketcdw/{server_params.PG_DATABASE}/{encoded_namespace}/".lower() + == f"s3://testbucketcdw/{server_params.PG_DATABASE}/{namespace}/".lower() ) run_command(f"""DROP SCHEMA "{namespace}" CASCADE""", pg_conn) @@ -189,61 +189,17 @@ def test_create_namespace_rollback( assert ( location.lower() - == f"s3://testbucketcdw/{server_params.PG_DATABASE}/{encoded_namespace}/".lower() + == f"s3://testbucketcdw/{server_params.PG_DATABASE}/{namespace}/".lower() ) -def test_create_namespace_on_existing_location( - pg_conn, - set_polaris_gucs, - with_default_location, - s3, - polaris_session, - installcheck, - create_http_helper_functions, -): - - if installcheck: - return - namespace = "tmp_namespace" - - run_command(f'''CREATE SCHEMA "{namespace}"''', pg_conn) - - run_command( - f"""SELECT lake_iceberg.register_namespace_to_rest_catalog('{server_params.PG_DATABASE}', '{namespace}')""", - pg_conn, - ) - - pg_conn.commit() - - # now, update the location to somewhere else - set_namespace_location(namespace, pg_conn) - - # now, drop and re-create should fail - run_command(f"DROP SCHEMA {namespace} CASCADE", pg_conn) - pg_conn.commit() - - run_command(f'''CREATE SCHEMA "{namespace}"''', pg_conn) - err = run_command( - f"""SELECT lake_iceberg.register_namespace_to_rest_catalog('{server_params.PG_DATABASE}', '{namespace}')""", - pg_conn, - raise_error=False, - ) - - pg_conn.commit() - - assert "is already registered with a different location" in str(err) - - pg_conn.rollback() - - existing_namespaces = [ "regular_nsp_name", - "nonregular_nsp !~*() name:$Uses_Of@", + "nonregular_nsp!~*()name:$Uses_Of@", ] existing_table_names = [ "regular_tbl_name", - "nonregular_tbl !~*() name:$Uses_Of@", + "nonregular_tbl!~*()name:$Uses_Of@", ] @@ -842,10 +798,6 @@ def create_rest_catalog_table( identifier=f"{namespace}.{tbl_name}", schema=schema, partition_spec=part_spec, - location=f"s3://{TEST_BUCKET}/{server_params.PG_DATABASE}_catalog/{quote(namespace)}/", - properties={ - "location": f"s3://{TEST_BUCKET}/{server_params.PG_DATABASE}_catalog/{quote(namespace)}/" - }, ) if drop_columns: diff --git a/test_common/rest_catalog/Makefile b/test_common/rest_catalog/Makefile index a27bad0d..fad0de6d 100644 --- a/test_common/rest_catalog/Makefile +++ b/test_common/rest_catalog/Makefile @@ -10,7 +10,7 @@ include $(PGXS) # change to trigger CI cache # for Polaris version changes -JAR_VERSION=1.1.0 +JAR_VERSION=1.2.0 # Polaris build and install all: @@ -18,8 +18,8 @@ all: install: cp polaris/gradlew $(PG_BINDIR)/. && \ - cp polaris/runtime/admin/build/polaris-admin-$(JAR_VERSION)-incubating-SNAPSHOT-runner.jar $(PG_BINDIR)/polaris-admin.jar && \ - cp polaris/runtime/server/build/polaris-server-$(JAR_VERSION)-incubating-SNAPSHOT-runner.jar $(PG_BINDIR)/polaris-server.jar + cp polaris/runtime/admin/build/polaris-admin-$(JAR_VERSION)-incubating-runner.jar $(PG_BINDIR)/polaris-admin.jar && \ + cp polaris/runtime/server/build/polaris-server-$(JAR_VERSION)-incubating-runner.jar $(PG_BINDIR)/polaris-server.jar clean: cd polaris && ./gradlew clean diff --git a/test_common/utils_pytest.py b/test_common/utils_pytest.py index 908d021b..92777ebc 100644 --- a/test_common/utils_pytest.py +++ b/test_common/utils_pytest.py @@ -214,7 +214,9 @@ def start_polaris_server_in_background(): "CLIENT_ID": "client_id", "CLIENT_SECRET": "client_secret", "AWS_ROLE_ARN": AWS_ROLE_ARN, - "STORAGE_LOCATION": f"s3://{TEST_BUCKET}", + # todo: this polaris server only works for the default database + # in the regression tests. + "STORAGE_LOCATION": f"s3://{TEST_BUCKET}/{server_params.PG_DATABASE}", "POLARIS_HOSTNAME": server_params.POLARIS_HOSTNAME, "POLARIS_PORT": str(server_params.POLARIS_PORT), "POLARIS_PRINCIPAL_CREDS_FILE": server_params.POLARIS_PRINCIPAL_CREDS_FILE, @@ -908,6 +910,9 @@ def create_mock_s3(): # Extract the KeyId from the response server_params.MANAGED_STORAGE_CMK_ID = response["KeyMetadata"]["KeyId"] + # Setting up STS + assume-role is not strictly required for Polaris version 1.2+ + # But we prefer to keep for now, as that's more closer to production workloads + # Create IAM role + STS assume-role # required for Polaris iam = boto3.client( @@ -1431,6 +1436,7 @@ def create_iceberg_rest_catalog(namespace): "oauth2-server-uri": oauth_token_url, # token endpoint "scope": "PRINCIPAL_ROLE:ALL", # typical Polaris scope "credential": f"{client_id}:{client_secret}", # "id:secret" + # S3/Moto settings (same as your JDBC helper) "s3.endpoint": f"http://localhost:{MOTO_PORT}", "s3.access-key-id": TEST_AWS_ACCESS_KEY_ID, @@ -1444,15 +1450,6 @@ def create_iceberg_rest_catalog(namespace): except NamespaceAlreadyExistsError: pass # ignore only this case - # this seems like a bug in pyiceberg, we need to - # update the location to get it reflected - catalog.update_namespace_properties( - f"{namespace}", - updates={ - "location": f"s3://{TEST_BUCKET}/{server_params.PG_DATABASE}_catalog/{quote(namespace)}" - }, - ) - return catalog From 0a6498ef67db8a7cd3d8c685a5a6fd50cafd6c6d Mon Sep 17 00:00:00 2001 From: Onder KALACI Date: Mon, 10 Nov 2025 12:08:56 +0300 Subject: [PATCH 2/6] add missing adding new polaris commits --- test_common/rest_catalog/polaris | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_common/rest_catalog/polaris b/test_common/rest_catalog/polaris index 756e535f..354a5ef6 160000 --- a/test_common/rest_catalog/polaris +++ b/test_common/rest_catalog/polaris @@ -1 +1 @@ -Subproject commit 756e535fabe150eefdfe16f3c61c1207217d2e2f +Subproject commit 354a5ef6b337bf690b7a12fefe2c984e2139b029 From 46cd0b0c4a9f0825983396c276f8aa2b91a63876 Mon Sep 17 00:00:00 2001 From: Onder KALACI Date: Mon, 10 Nov 2025 11:41:16 +0300 Subject: [PATCH 3/6] Enable create table writable rest catalog --- pg_lake_table/src/ddl/create_table.c | 31 ++++++++++--------- pg_lake_table/src/fdw/option.c | 15 +++------ .../tests/pytests/test_polaris_catalog.py | 4 +-- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/pg_lake_table/src/ddl/create_table.c b/pg_lake_table/src/ddl/create_table.c index 583e89d9..f68f81e2 100644 --- a/pg_lake_table/src/ddl/create_table.c +++ b/pg_lake_table/src/ddl/create_table.c @@ -656,18 +656,19 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params) bool hasRestCatalogOption = HasRestCatalogTableOption(createStmt->options); bool hasObjectStoreCatalogOption = HasObjectStoreCatalogTableOption(createStmt->options); - bool hasExternalCatalogReadOnlyOption = HasReadOnlyOption(createStmt->options); - /* - * Read-only external catalog tables are a special case of Iceberg tables. - * They are recognized as Iceberg tables, but are not registered in any - * internal catalogs (e.g., lake_iceberg.tables). Instead, the table is - * created only in PostgreSQL’s system catalogs. When the table is - * queried, its metadata is fetched on demand from the external catalog. - */ - if ((hasObjectStoreCatalogOption || - (hasRestCatalogOption && hasExternalCatalogReadOnlyOption))) + if (hasObjectStoreCatalogOption || hasRestCatalogOption) { + /* + * Read-only external catalog tables are a special case of Iceberg + * tables. They are recognized as Iceberg tables, but are not + * registered in any internal catalogs (e.g., lake_iceberg.tables). + * Instead, the table is created only in PostgreSQL’s system + * catalogs. When the table is queried, its metadata is fetched on + * demand from the external catalog. + */ + bool hasExternalCatalogReadOnlyOption = HasReadOnlyOption(createStmt->options); + char *metadataLocation = NULL; char *catalogNamespace = NULL; char *catalogTableName = NULL; @@ -725,7 +726,7 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params) catalogName = catalogNameProvided; } - if (hasRestCatalogOption) + if (hasRestCatalogOption && hasExternalCatalogReadOnlyOption) { ErrorIfRestNamespaceDoesNotExist(catalogName, catalogNamespace); @@ -742,7 +743,7 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params) catalogTableName); } - if (hasObjectStoreCatalogOption && !hasExternalCatalogReadOnlyOption) + if (!hasExternalCatalogReadOnlyOption) { /* * For writable object store catalog tables, we need to continue @@ -757,11 +758,11 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params) catalogNameProvided != NULL) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("writable object store catalog iceberg tables do not " - "allow explicit catalog options"))); + errmsg("writable %s catalog iceberg tables do not " + "allow explicit catalog options", hasObjectStoreCatalogOption ? "object store" : "REST"))); } } - else if (createStmt->base.tableElts == NIL) + else if (createStmt->base.tableElts == NIL && hasExternalCatalogReadOnlyOption) { List *dataFileColumns = DescribeColumnsFromIcebergMetadataURI(metadataLocation, false); diff --git a/pg_lake_table/src/fdw/option.c b/pg_lake_table/src/fdw/option.c index fc2fa1bb..dfadabf4 100644 --- a/pg_lake_table/src/fdw/option.c +++ b/pg_lake_table/src/fdw/option.c @@ -835,12 +835,6 @@ pg_lake_iceberg_validator(PG_FUNCTION_ARGS) !readOnlyExternalCatalogTable) icebergCatalogType = OBJECT_STORE_READ_WRITE; - if (catalog == ForeignTableRelationId && - icebergCatalogType == REST_CATALOG_READ_WRITE) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("rest catalog iceberg tables are read only, use CREATE TABLE ... WITH (catalog = 'rest', read_only=true) to create a read-only table"))); - if (catalog == ForeignTableRelationId && locationProvided == false && (icebergCatalogType == POSTGRES_CATALOG || icebergCatalogType == OBJECT_STORE_READ_WRITE)) ereport(ERROR, @@ -857,8 +851,7 @@ pg_lake_iceberg_validator(PG_FUNCTION_ARGS) if (catalog == ForeignTableRelationId) { - if (icebergCatalogType == REST_CATALOG_READ_ONLY || icebergCatalogType == REST_CATALOG_READ_WRITE || - icebergCatalogType == OBJECT_STORE_READ_ONLY) + if (icebergCatalogType == REST_CATALOG_READ_ONLY || icebergCatalogType == OBJECT_STORE_READ_ONLY) { /* * catalog_namespace, catalog_table_name and catalog_name is @@ -887,17 +880,17 @@ pg_lake_iceberg_validator(PG_FUNCTION_ARGS) if (catalogNamespace) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("\"catalog_namespace\" option is only valid for catalog=\"rest\""))); + errmsg("\"catalog_namespace\" option is only valid for writable catalog=\"rest\""))); if (catalogTableName) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("\"catalog_table_name\" option is only valid for catalog=\"rest\""))); + errmsg("\"catalog_table_name\" option is only valid for writable catalog=\"rest\""))); if (catalogName) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("\"catalog_name\" option is only valid for catalog=\"rest\""))); + errmsg("\"catalog_name\" option is only valid for writable catalog=\"rest\""))); } } diff --git a/pg_lake_table/tests/pytests/test_polaris_catalog.py b/pg_lake_table/tests/pytests/test_polaris_catalog.py index c88e5314..9d146f94 100644 --- a/pg_lake_table/tests/pytests/test_polaris_catalog.py +++ b/pg_lake_table/tests/pytests/test_polaris_catalog.py @@ -98,7 +98,7 @@ def test_create_namespace( pg_conn, raise_error=False, ) - assert "rest catalog iceberg tables are read only" in str(res) + assert "writable REST catalog iceberg tables do not" in str(res) pg_conn.rollback() res = run_command( @@ -106,7 +106,7 @@ def test_create_namespace( pg_conn, raise_error=False, ) - assert "rest catalog iceberg tables are read only" in str(res) + assert "writable REST catalog iceberg tables do not" in str(res) pg_conn.rollback() run_command(f"""DROP SCHEMA "{namespace}" CASCADE""", pg_conn) From 5feb31702b457cf1f7596633c249d5db1f1471c2 Mon Sep 17 00:00:00 2001 From: Onder KALACI Date: Mon, 10 Nov 2025 12:04:10 +0300 Subject: [PATCH 4/6] Align rest catalog C functions to option definitions --- .../src/rest_catalog/rest_catalog.c | 97 ++++++++++++------- 1 file changed, 64 insertions(+), 33 deletions(-) diff --git a/pg_lake_iceberg/src/rest_catalog/rest_catalog.c b/pg_lake_iceberg/src/rest_catalog/rest_catalog.c index 36ff23da..4caf120e 100644 --- a/pg_lake_iceberg/src/rest_catalog/rest_catalog.c +++ b/pg_lake_iceberg/src/rest_catalog/rest_catalog.c @@ -488,55 +488,75 @@ GetIcebergCatalogType(Oid relationId) /* -* Users can provide different names for the table and the catalog names. -* This function first checks if users provided a custom catalog table name. -* If so, returns that, otherwise returns Postgres table name. +* Readable rest catalog tables always use the catalog_table_name option +* as the table name in the external catalog. Writable rest catalog tables +* use the Postgres table name as the catalog table name. */ char * GetRestCatalogTableName(Oid relationId) { - Assert(GetIcebergCatalogType(relationId) == REST_CATALOG_READ_ONLY || - GetIcebergCatalogType(relationId) == REST_CATALOG_READ_WRITE); + IcebergCatalogType catalogType = GetIcebergCatalogType(relationId); - ForeignTable *foreignTable = GetForeignTable(relationId); - List *options = foreignTable->options; + Assert(catalogType == REST_CATALOG_READ_ONLY || + catalogType == REST_CATALOG_READ_WRITE); - char *catalogTableName = GetStringOption(options, "catalog_table_name", false); + if (catalogType == REST_CATALOG_READ_ONLY) + { + ForeignTable *foreignTable = GetForeignTable(relationId); + List *options = foreignTable->options; - /* user provided the custom catalog table name */ - if (!catalogTableName) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("catalog_table_name option is required for rest catalog iceberg tables"))); + char *catalogTableName = GetStringOption(options, "catalog_table_name", false); - return catalogTableName; + /* user provided the custom catalog table name */ + if (!catalogTableName) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("catalog_table_name option is required for rest catalog iceberg tables"))); + return catalogTableName; + } + else + { + /* for writable rest catalog tables, we use the Postgres table name */ + return get_rel_name(relationId); + } } /* -* Users can provide different names for the table and the catalog names. -* This function first checks if users provided a custom catalog namespace. -* If so, returns that, otherwise returns Postgres schema name. +* Readable rest catalog tables always use the catalog_namespace option +* as the namespace in the external catalog. Writable rest catalog tables +* use the Postgres schema name as the namespace. */ char * GetRestCatalogNamespace(Oid relationId) { - Assert(GetIcebergCatalogType(relationId) == REST_CATALOG_READ_ONLY || - GetIcebergCatalogType(relationId) == REST_CATALOG_READ_WRITE); + IcebergCatalogType catalogType = GetIcebergCatalogType(relationId); - ForeignTable *foreignTable = GetForeignTable(relationId); - List *options = foreignTable->options; + Assert(catalogType == REST_CATALOG_READ_ONLY || + catalogType == REST_CATALOG_READ_WRITE); + + if (catalogType == REST_CATALOG_READ_ONLY) + { - char *catalogNamespace = GetStringOption(options, "catalog_namespace", false); + ForeignTable *foreignTable = GetForeignTable(relationId); + List *options = foreignTable->options; - /* user provided the custom catalog namespace */ - if (!catalogNamespace) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("catalog_namespace option is required for rest catalog iceberg tables"))); + char *catalogNamespace = GetStringOption(options, "catalog_namespace", false); - return catalogNamespace; + /* user provided the custom catalog namespace */ + if (!catalogNamespace) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("catalog_namespace option is required for rest catalog iceberg tables"))); + + return catalogNamespace; + } + else + { + /* for writable rest catalog tables, we use the Postgres schema name */ + return get_namespace_name(get_rel_namespace(relationId)); + } } bool @@ -558,14 +578,21 @@ HasReadOnlyOption(List *options) /* -* Users can provide different names for the catalog. If not provided, -* we use the database name as the catalog name. +* Readable rest catalog tables always use the catalog_name option +* as the catalog name in the external catalog. Writable rest catalog tables +* use the current database name as the catalog name. */ static char * GetRestCatalogName(Oid relationId) { - if (relationId != InvalidOid) + IcebergCatalogType catalogType = GetIcebergCatalogType(relationId); + + Assert(catalogType == REST_CATALOG_READ_ONLY || + catalogType == REST_CATALOG_READ_WRITE); + + if (catalogType == REST_CATALOG_READ_ONLY) { + Assert(GetIcebergCatalogType(relationId) == REST_CATALOG_READ_ONLY || GetIcebergCatalogType(relationId) == REST_CATALOG_READ_WRITE); @@ -575,8 +602,12 @@ GetRestCatalogName(Oid relationId) char *catalogName = GetStringOption(options, "catalog_name", false); /* user provided the custom catalog name */ - if (catalogName) - return catalogName; + if (!catalogName) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("catalog_name option is required for rest catalog iceberg tables"))); + + return catalogName; } return get_database_name(MyDatabaseId); From fab6104329e29f71eaa4c0638de2e517c5c6ec61 Mon Sep 17 00:00:00 2001 From: Onder KALACI Date: Mon, 10 Nov 2025 13:05:08 +0300 Subject: [PATCH 5/6] Actually create the namespace on table creation --- .../pg_lake/rest_catalog/rest_catalog.h | 1 + .../src/rest_catalog/rest_catalog.c | 3 +- pg_lake_table/src/ddl/create_table.c | 28 +++++++++++++++++-- .../tests/pytests/test_polaris_catalog.py | 8 +++--- 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h b/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h index 5fd7e66b..0baaf881 100644 --- a/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h +++ b/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h @@ -38,6 +38,7 @@ extern PGDLLEXPORT void RegisterNamespaceToRestCatalog(const char *catalogName, bool hasRestCatalogReadOnlyOption); extern PGDLLEXPORT void ErrorIfRestNamespaceDoesNotExist(const char *catalogName, const char *namespaceName); extern PGDLLEXPORT IcebergCatalogType GetIcebergCatalogType(Oid relationId); +extern PGDLLEXPORT char *GetRestCatalogName(Oid relationId); extern PGDLLEXPORT char *GetRestCatalogNamespace(Oid relationId); extern PGDLLEXPORT char *GetRestCatalogTableName(Oid relationId); extern PGDLLEXPORT bool HasRestCatalogTableOption(List *options); diff --git a/pg_lake_iceberg/src/rest_catalog/rest_catalog.c b/pg_lake_iceberg/src/rest_catalog/rest_catalog.c index 4caf120e..860dfba1 100644 --- a/pg_lake_iceberg/src/rest_catalog/rest_catalog.c +++ b/pg_lake_iceberg/src/rest_catalog/rest_catalog.c @@ -44,7 +44,6 @@ char *RestCatalogClientSecret = NULL; static void CreateNamespaceOnRestCatalog(const char *catalogName, const char *namespaceName); static char *EncodeBasicAuth(const char *clientId, const char *clientSecret); static char *JsonbGetStringByPath(const char *jsonb_text, int nkeys,...); -static char *GetRestCatalogName(Oid relationId); static List *PostHeadersWithAuth(void); static List *GetHeadersWithAuth(void); static void ReportHTTPError(HttpResult httpResult, int level); @@ -582,7 +581,7 @@ HasReadOnlyOption(List *options) * as the catalog name in the external catalog. Writable rest catalog tables * use the current database name as the catalog name. */ -static char * +char * GetRestCatalogName(Oid relationId) { IcebergCatalogType catalogType = GetIcebergCatalogType(relationId); diff --git a/pg_lake_table/src/ddl/create_table.c b/pg_lake_table/src/ddl/create_table.c index f68f81e2..25727302 100644 --- a/pg_lake_table/src/ddl/create_table.c +++ b/pg_lake_table/src/ddl/create_table.c @@ -659,6 +659,8 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params) if (hasObjectStoreCatalogOption || hasRestCatalogOption) { + Oid namespaceId = RangeVarGetAndCheckCreationNamespace(createStmt->base.relation, NoLock, NULL); + /* * Read-only external catalog tables are a special case of Iceberg * tables. They are recognized as Iceberg tables, but are not @@ -684,8 +686,6 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params) */ if (catalogNamespaceProvided == NULL && hasExternalCatalogReadOnlyOption) { - Oid namespaceId = RangeVarGetAndCheckCreationNamespace(createStmt->base.relation, NoLock, NULL); - catalogNamespace = get_namespace_name(namespaceId); /* add catalog_namespace table options */ @@ -761,6 +761,30 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params) errmsg("writable %s catalog iceberg tables do not " "allow explicit catalog options", hasObjectStoreCatalogOption ? "object store" : "REST"))); } + + if (hasRestCatalogOption) + { + /* + * For writable rest catalog iceberg tables, we register the + * namespace in the rest catalog. We do that early in the + * command processing so that any errors in the registration + * are caught before we create the actual table. + * + * Note that registering a namespace is not a transactional + * operation from pg_lake's perspective. If the subsequent + * table creation fails, the namespace registration will + * remain. We accept that tradeoff for simplicity as + * re-registering an existing namespace is a no-op. For a + * writable rest catalog iceberg table, the namespace is + * always the table's schema name. Similarly, the catalog name + * is always the database name. We normally encode that in + * GetRestCatalogName() etc., but here we need to do it early + * before the table is created. + */ + RegisterNamespaceToRestCatalog(get_database_name(MyDatabaseId), + get_namespace_name(namespaceId), + hasExternalCatalogReadOnlyOption); + } } else if (createStmt->base.tableElts == NIL && hasExternalCatalogReadOnlyOption) { diff --git a/pg_lake_table/tests/pytests/test_polaris_catalog.py b/pg_lake_table/tests/pytests/test_polaris_catalog.py index 9d146f94..fe7464ed 100644 --- a/pg_lake_table/tests/pytests/test_polaris_catalog.py +++ b/pg_lake_table/tests/pytests/test_polaris_catalog.py @@ -62,7 +62,7 @@ def test_create_namespace( pg_conn.commit() run_command( - f"""SELECT lake_iceberg.register_namespace_to_rest_catalog('{server_params.PG_DATABASE}', '{namespace}')""", + f"""CREATE TABLE "{namespace}".tbl(a int) USING iceberg WITH (catalog='rest');""", pg_conn, ) pg_conn.commit() @@ -132,13 +132,13 @@ def test_create_namespace_in_tx( run_command(f'''CREATE SCHEMA "{namespace}_2"''', pg_conn) run_command( - f"""SELECT lake_iceberg.register_namespace_to_rest_catalog('{server_params.PG_DATABASE}', '{namespace}')""", + f"""CREATE TABLE "{namespace}".tbl(a int) USING iceberg WITH (catalog='rest');""", pg_conn, ) pg_conn.commit() run_command( - f"""SELECT lake_iceberg.register_namespace_to_rest_catalog('{server_params.PG_DATABASE}', '{namespace}_2')""", + f"""CREATE TABLE "{namespace}_2".tbl(a int) USING iceberg WITH (catalog='rest');""", pg_conn, ) pg_conn.commit() @@ -176,7 +176,7 @@ def test_create_namespace_rollback( run_command(f'''CREATE SCHEMA "{namespace}"''', pg_conn) run_command( - f"""SELECT lake_iceberg.register_namespace_to_rest_catalog('{server_params.PG_DATABASE}', '{namespace}')""", + f"""CREATE TABLE "{namespace}".tbl(a int) USING iceberg WITH (catalog='rest');""", pg_conn, ) From 1d29e3341f88972156e96cde09696baef61ac8fd Mon Sep 17 00:00:00 2001 From: Onder KALACI Date: Tue, 11 Nov 2025 15:17:14 +0300 Subject: [PATCH 6/6] Create table in REST catalog --- .../include/pg_lake/iceberg/metadata_spec.h | 2 + .../pg_lake/rest_catalog/rest_catalog.h | 11 +- pg_lake_iceberg/src/iceberg/catalog.c | 4 +- .../src/iceberg/metadata_operations.c | 65 +++++- .../src/iceberg/write_table_metadata.c | 44 +++- .../src/rest_catalog/rest_catalog.c | 209 +++++++++++++++++- .../track_iceberg_metadata_changes.h | 13 ++ pg_lake_table/src/ddl/create_table.c | 23 ++ pg_lake_table/src/ddl/ddl_changes.c | 30 ++- pg_lake_table/src/ddl/vacuum.c | 4 +- .../src/transaction/external_heavy_asserts.c | 9 + .../track_iceberg_metadata_changes.c | 176 +++++++++++++++ .../src/transaction/transaction_hooks.c | 3 + .../tests/pytests/test_polaris_catalog.py | 44 +++- 14 files changed, 610 insertions(+), 27 deletions(-) diff --git a/pg_lake_iceberg/include/pg_lake/iceberg/metadata_spec.h b/pg_lake_iceberg/include/pg_lake/iceberg/metadata_spec.h index 1a319894..9bed7fe5 100644 --- a/pg_lake_iceberg/include/pg_lake/iceberg/metadata_spec.h +++ b/pg_lake_iceberg/include/pg_lake/iceberg/metadata_spec.h @@ -290,3 +290,5 @@ typedef struct IcebergTableMetadata extern PGDLLEXPORT IcebergTableMetadata * ReadIcebergTableMetadata(const char *tableMetadataPath); extern PGDLLEXPORT char *WriteIcebergTableMetadataToJson(IcebergTableMetadata * metadata); +extern PGDLLEXPORT void AppendIcebergTableSchemaForRestCatalogStage(StringInfo command, IcebergTableSchema * schemas, size_t schemas_length); +extern PGDLLEXPORT void AppendIcebergPartitionSpecFields(StringInfo command, IcebergPartitionSpecField * fields, size_t fields_length); diff --git a/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h b/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h index 0baaf881..30ad0cd9 100644 --- a/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h +++ b/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h @@ -20,8 +20,9 @@ #include "postgres.h" #include "pg_lake/http/http_client.h" #include "pg_lake/util/rel_utils.h" +#include "pg_lake/parquet/field.h" -extern char *RestCatalogHost; +extern PGDLLEXPORT char *RestCatalogHost; extern char *RestCatalogClientId; extern char *RestCatalogClientSecret; @@ -30,10 +31,14 @@ extern char *RestCatalogClientSecret; #define REST_CATALOG_NAMESPACE_NAME "%s/api/catalog/v1/%s/namespaces/%s" #define REST_CATALOG_NAMESPACE "%s/api/catalog/v1/%s/namespaces" +#define REST_CATALOG_TABLE "%s/api/catalog/v1/%s/namespaces/%s/tables/%s" +#define REST_CATALOG_TABLES "%s/api/catalog/v1/%s/namespaces/%s/tables" + #define REST_CATALOG_AUTH_TOKEN_PATH "%s/api/catalog/v1/oauth/tokens" -#define GET_REST_CATALOG_METADATA_LOCATION "%s/api/catalog/v1/%s/namespaces/%s/tables/%s" extern PGDLLEXPORT char *RestCatalogFetchAccessToken(void); +extern PGDLLEXPORT void StartStageRestCatalogIcebergTableCreate(Oid relationId); +extern PGDLLEXPORT char *FinishStageRestCatalogIcebergTableCreateRestRequest(Oid relationId, DataFileSchema * dataFileSchema, List *partitionSpecs); extern PGDLLEXPORT void RegisterNamespaceToRestCatalog(const char *catalogName, const char *namespaceName, bool hasRestCatalogReadOnlyOption); extern PGDLLEXPORT void ErrorIfRestNamespaceDoesNotExist(const char *catalogName, const char *namespaceName); @@ -47,3 +52,5 @@ extern PGDLLEXPORT bool IsReadOnlyRestCatalogIcebergTable(Oid relationId); extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(const char *restCatalogName, const char *namespaceName, const char *relationName); extern PGDLLEXPORT char *GetMetadataLocationForRestCatalogForIcebergTable(Oid relationId); +extern PGDLLEXPORT void ReportHTTPError(HttpResult httpResult, int level); +extern PGDLLEXPORT List *PostHeadersWithAuth(void); diff --git a/pg_lake_iceberg/src/iceberg/catalog.c b/pg_lake_iceberg/src/iceberg/catalog.c index c2529db8..5bca4c37 100644 --- a/pg_lake_iceberg/src/iceberg/catalog.c +++ b/pg_lake_iceberg/src/iceberg/catalog.c @@ -62,7 +62,7 @@ InsertInternalIcebergCatalogTable(Oid relationId, const char *metadataLocation, DECLARE_SPI_ARGS(3); SPI_ARG_VALUE(1, OIDOID, relationId, false); - SPI_ARG_VALUE(2, TEXTOID, metadataLocation, false); + SPI_ARG_VALUE(2, TEXTOID, metadataLocation, metadataLocation == NULL); SPI_ARG_VALUE(3, BOOLOID, hasCustomLocation, false); SPI_START(); @@ -475,7 +475,7 @@ bool RelationExistsInTheIcebergCatalog(Oid relationId) { bool forUpdate = false; - char *columnName = "metadata_location"; + char *columnName = "table_name"; bool errorIfNotFound = false; char *metadataLocation = diff --git a/pg_lake_iceberg/src/iceberg/metadata_operations.c b/pg_lake_iceberg/src/iceberg/metadata_operations.c index 11ae9d6f..014df08d 100644 --- a/pg_lake_iceberg/src/iceberg/metadata_operations.c +++ b/pg_lake_iceberg/src/iceberg/metadata_operations.c @@ -157,16 +157,33 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT Assert(!ShouldSkipMetadataChangeToIceberg(metadataOperationTypes)); #endif + IcebergCatalogType catalogType = GetIcebergCatalogType(relationId); + bool writableRestCatalogTable = catalogType == REST_CATALOG_READ_WRITE; /* read the iceberg metadata for the table */ bool forUpdate = true; char *metadataPath = GetIcebergCatalogMetadataLocation(relationId, forUpdate); - bool createNewTable = HasCreateTableOperation(metadataOperations); - IcebergTableMetadata *metadata = (createNewTable) ? - GenerateInitialIcebergTableMetadata(relationId) : - ReadIcebergTableMetadata(metadataPath); + IcebergTableMetadata *metadata = NULL; + + if (writableRestCatalogTable && !createNewTable) + { + /* + * Writable rest catalog iceberg tables have their metadata updated in + * the catalog itself. We fetch the metadata from the rest catalog. If + * new table, we generate initial metadata below. + */ + metadataPath = GetMetadataLocationForRestCatalogForIcebergTable(relationId); + metadata = ReadIcebergTableMetadata(metadataPath); + } + else + { + metadata = (createNewTable) ? + GenerateInitialIcebergTableMetadata(relationId) : + ReadIcebergTableMetadata(metadataPath); + } + int64_t prevLastUpdatedMs = metadata->last_updated_ms; @@ -184,6 +201,11 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT if (builder->createTable || builder->regenerateSchema) { AppendCurrentPostgresSchema(relationId, metadata, builder->schema); + + /* + * TODO: Create RestCatalogRequest for updating the schema in the + * writable rest catalog iceberg table. + */ } if (builder->createTable || builder->regeneratePartitionSpec) @@ -197,6 +219,11 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT IcebergPartitionSpec *newSpec = lfirst(newSpecCell); AppendPartitionSpec(metadata, newSpec); + + /* + * TODO: Create RestCatalogRequest for updating the partitioning + * in the writable rest catalog iceberg table. + */ } } @@ -216,6 +243,11 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT UpdateLatestSnapshot(metadata, newSnapshot); createNewSnapshot = true; + + /* + * TODO: Create RestCatalogRequest for setting the current_snapshot_id + * in the writable rest catalog iceberg table. + */ } /* if we need to expire old snapshots, we do it here */ @@ -225,13 +257,38 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT RemoveOldSnapshotsFromMetadata(relationId, metadata, isVerbose); if (expiredSnapshots) + { createNewSnapshot = true; + + /* + * TODO: Create RestCatalogRequest for removing old snapshots in + * the writable rest catalog iceberg table. + */ + } } /* if there were no changes to the Iceberg table, we are done */ if (!createNewSnapshot && !createNewTable) return; + if (writableRestCatalogTable) + { + + if (createNewSnapshot) + { + /* + * TODO: Create RestCatalogRequest for adding the new snapshot in + * the writable rest catalog iceberg table. + */ + } + + /* + * We are done, writable rest catalog iceberg tables have their + * metadata updated in the catalog itself. + */ + return; + } + /* add the new snapshot to the snapshot log */ GenerateSnapshotLogEntries(metadata); diff --git a/pg_lake_iceberg/src/iceberg/write_table_metadata.c b/pg_lake_iceberg/src/iceberg/write_table_metadata.c index de8128de..69a153f2 100644 --- a/pg_lake_iceberg/src/iceberg/write_table_metadata.c +++ b/pg_lake_iceberg/src/iceberg/write_table_metadata.c @@ -34,7 +34,6 @@ static void AppendProperties(StringInfo command, Property * properties, size_t p static void AppendField(StringInfo command, Field * field); static void AppendIcebergStructFields(StringInfo command, FieldStructElement * fields, size_t fields_length); static void AppendIcebergTableSchemas(StringInfo command, IcebergTableSchema * schemas, size_t schemas_length); -static void AppendIcebergPartitionSpecFields(StringInfo command, IcebergPartitionSpecField * fields, size_t fields_length); static void AppendIcebergPartitionSpecs(StringInfo command, IcebergPartitionSpec * specs, size_t specs_length); static void AppendIcebergSnapshots(StringInfo command, IcebergSnapshot * snapshots, size_t snapshots_length); static void AppendIcebergSnapshotLogEntries(StringInfo command, IcebergSnapshotLogEntry * entries, size_t entries_length); @@ -244,6 +243,47 @@ AppendIcebergTableSchemas(StringInfo command, IcebergTableSchema * schemas, size appendStringInfoString(command, "]"); } + +/* +* Similar to AppendIcebergTableSchemas, but specifically for Rest Catalog stage +* API calls. +*/ +void +AppendIcebergTableSchemaForRestCatalogStage(StringInfo command, IcebergTableSchema * schemas, size_t schemas_length) +{ + appendStringInfoString(command, "\"schema\":"); + + for (size_t i = 0; i < schemas_length; i++) + { + appendStringInfoString(command, "{"); + + /* append type */ + appendJsonString(command, "type", schemas[i].type); + + if (schemas[i].identifier_field_ids_length > 0) + { + appendStringInfoString(command, ", "); + appendStringInfoString(command, "\"identifier-field-ids\":"); + AppendIntArray(command, schemas[i].identifier_field_ids, + schemas[i].identifier_field_ids_length); + } + + /* Append fields */ + appendStringInfoString(command, ", "); + + appendStringInfoString(command, "\"fields\":"); + AppendIcebergStructFields(command, schemas[i].fields, schemas[i].fields_length); + + appendStringInfoString(command, "}"); + + if (i < schemas_length - 1) + { + appendStringInfoString(command, ", "); + } + } +} + + static void AppendIcebergPartitionSpecs(StringInfo command, IcebergPartitionSpec * specs, size_t specs_length) { @@ -681,7 +721,7 @@ AppendIcebergStructFields(StringInfo command, FieldStructElement * fields, size_ } -static void +void AppendIcebergPartitionSpecFields(StringInfo command, IcebergPartitionSpecField * fields, size_t fields_length) { appendStringInfoString(command, "["); diff --git a/pg_lake_iceberg/src/rest_catalog/rest_catalog.c b/pg_lake_iceberg/src/rest_catalog/rest_catalog.c index 860dfba1..33d7bff6 100644 --- a/pg_lake_iceberg/src/rest_catalog/rest_catalog.c +++ b/pg_lake_iceberg/src/rest_catalog/rest_catalog.c @@ -26,6 +26,9 @@ #include "utils/jsonb.h" #include "utils/lsyscache.h" +#include "pg_lake/iceberg/api/table_schema.h" +#include "pg_lake/iceberg/metadata_spec.h" + #include "pg_lake/http/http_client.h" #include "pg_lake/object_store_catalog/object_store_catalog.h" #include "pg_lake/rest_catalog/rest_catalog.h" @@ -35,6 +38,7 @@ #include "pg_lake/util/url_encode.h" #include "pg_lake/util/rel_utils.h" + /* determined by GUC */ char *RestCatalogHost = "http://localhost:8181"; char *RestCatalogClientId = NULL; @@ -44,9 +48,176 @@ char *RestCatalogClientSecret = NULL; static void CreateNamespaceOnRestCatalog(const char *catalogName, const char *namespaceName); static char *EncodeBasicAuth(const char *clientId, const char *clientSecret); static char *JsonbGetStringByPath(const char *jsonb_text, int nkeys,...); -static List *PostHeadersWithAuth(void); static List *GetHeadersWithAuth(void); -static void ReportHTTPError(HttpResult httpResult, int level); +static char *AppendIcebergPartitionSpecForRestCatalogStage(List *partitionSpecs); + +/* +* StartStageRestCatalogIcebergTableCreate stages the creation of an iceberg table +* in the rest catalog. On any failure, an error is raised. If the table exists, +* an error is raised as well. +* +* As per REST catalog spec, we need to provide an empty schema when creating +* a table. The schema will be updated when we make this table visible/committed. +* The main reason for staging early is to be able to get the vended credentials +* for writable tables. +*/ +void +StartStageRestCatalogIcebergTableCreate(Oid relationId) +{ + const char *catalogName = GetRestCatalogName(relationId); + const char *namespaceName = GetRestCatalogNamespace(relationId); + const char *relationName = GetRestCatalogTableName(relationId); + + StringInfo body = makeStringInfo(); + + appendStringInfoChar(body, '{'); /* start body */ + appendJsonString(body, "name", relationName); + + appendStringInfoString(body, ", "); + appendJsonKey(body, "schema"); + + appendStringInfoChar(body, '{'); /* start schema object */ + + appendJsonString(body, "type", "struct"); + appendStringInfoString(body, ", "); + appendJsonKey(body, "fields"); + appendStringInfoString(body, "[]"); /* empty fields array, we don't know + * the schema yet */ + + appendStringInfoChar(body, '}'); /* close schema object */ + appendStringInfoString(body, ", "); + + appendJsonString(body, "stage-create", "true"); + + appendStringInfoChar(body, '}'); /* close body */ + + char *postUrl = + psprintf(REST_CATALOG_TABLES, RestCatalogHost, + URLEncodePath(catalogName), URLEncodePath(namespaceName)); + List *headers = PostHeadersWithAuth(); + + /* + * TODO: Should we make this configurable? Some object stores may require + * different headers or authentication methods. + */ + char *vendedCreds = pstrdup("X-Iceberg-Access-Delegation: vended-credentials"); + + headers = lappend(headers, vendedCreds); + + HttpResult httpResult = HttpPost(postUrl, body->data, headers); + + if (httpResult.status != 200) + { + ReportHTTPError(httpResult, ERROR); + } +} + + +/* +* FinishStageRestCatalogIcebergTableCreateRestRequest creates the REST catalog +* request to finalize the staging of an iceberg table creation in the rest +* catalog. +*/ +char * +FinishStageRestCatalogIcebergTableCreateRestRequest(Oid relationId, DataFileSchema * dataFileSchema, List *partitionSpecs) +{ + StringInfo body = makeStringInfo(); + + appendStringInfoChar(body, '{'); + + appendJsonKey(body, "requirements"); + appendStringInfoChar(body, '['); /* start requirements array */ + appendStringInfoChar(body, '{'); /* start requirements element */ + + appendJsonString(body, "type", "assert-create"); + + appendStringInfoChar(body, '}'); /* close requirements element */ + appendStringInfoChar(body, ']'); /* close requirements array */ + + appendStringInfoChar(body, ','); + + appendJsonKey(body, "updates"); + appendStringInfoChar(body, '['); /* start updates array */ + appendStringInfoChar(body, '{'); /* start updates element */ + + appendJsonString(body, "action", "add-schema"); + + appendStringInfoChar(body, ','); + + int lastColumnId = 0; + IcebergTableSchema *newSchema = + RebuildIcebergSchemaFromDataFileSchema(relationId, dataFileSchema, &lastColumnId); + int schemaCount = 1; + + AppendIcebergTableSchemaForRestCatalogStage(body, newSchema, schemaCount); + appendStringInfoChar(body, '}'); /* close updates element */ + + appendStringInfoChar(body, ','); + appendStringInfoChar(body, '{'); /* start add-sort-order */ + appendJsonString(body, "action", "add-sort-order"); + appendStringInfoString(body, ", "); + appendJsonKey(body, "sort-order"); + appendStringInfoChar(body, '{'); /* start sort-order object */ + appendJsonInt32(body, "order-id", 0); + appendStringInfoString(body, ", "); + appendJsonKey(body, "fields"); + appendStringInfoString(body, "[]"); /* empty fields array */ + appendStringInfoChar(body, '}'); /* finish sort-order object */ + appendStringInfoChar(body, '}'); /* finish add-sort-order */ + appendStringInfoChar(body, ','); + appendStringInfoChar(body, '{'); /* start add-sort-order */ + appendJsonString(body, "action", "set-default-sort-order"); + appendStringInfoString(body, ", "); + appendJsonInt32(body, "sort-order-id", 0); + appendStringInfoChar(body, '}'); /* finish add-sort-order */ + + appendStringInfoString(body, ", "); + appendStringInfoChar(body, '{'); /* start set-location */ + appendJsonString(body, "action", "set-location"); + appendStringInfoChar(body, ','); + + /* construct location */ + StringInfo location = makeStringInfo(); + const char *catalogName = GetRestCatalogName(relationId); + const char *namespaceName = GetRestCatalogNamespace(relationId); + const char *relationName = GetRestCatalogTableName(relationId); + + appendStringInfo(location, "%s/%s/%s/%s/%d", IcebergDefaultLocationPrefix, catalogName, namespaceName, relationName, relationId); + appendJsonString(body, "location", location->data); + appendStringInfoChar(body, '}'); /* end set-location */ + + /* add partition spec */ + appendStringInfoChar(body, ','); + + ListCell *partitionSpecCell = NULL; + + foreach(partitionSpecCell, partitionSpecs) + { + IcebergPartitionSpec *spec = (IcebergPartitionSpec *) lfirst(partitionSpecCell); + + appendStringInfoChar(body, '{'); /* start add-partition-spec */ + appendJsonString(body, "action", "add-spec"); + appendStringInfoString(body, ", "); + + appendStringInfoString(body, AppendIcebergPartitionSpecForRestCatalogStage(list_make1(spec))); + + appendStringInfoChar(body, '}'); /* finish add-partition-spec */ + appendStringInfoString(body, ", "); + } + + if (list_length(partitionSpecs) == 0) + appendStringInfoChar(body, ','); + + appendStringInfoChar(body, '{'); /* start set-default-spec */ + appendJsonString(body, "action", "set-default-spec"); + appendStringInfoString(body, ", "); + appendJsonInt32(body, "spec-id", -1); /* -1 means latest */ + appendStringInfoChar(body, '}'); /* finish set-default-spec */ + appendStringInfoChar(body, ']'); /* end updates array */ + appendStringInfoChar(body, '}'); + + return body->data; +} /* @@ -199,7 +370,7 @@ char * GetMetadataLocationFromRestCatalog(const char *restCatalogName, const char *namespaceName, const char *relationName) { char *getUrl = - psprintf(GET_REST_CATALOG_METADATA_LOCATION, + psprintf(REST_CATALOG_TABLE, RestCatalogHost, URLEncodePath(restCatalogName), URLEncodePath(namespaceName), URLEncodePath(relationName)); List *headers = GetHeadersWithAuth(); @@ -257,7 +428,7 @@ CreateNamespaceOnRestCatalog(const char *catalogName, const char *namespaceName) /* * Creates the headers for a POST request with authentication. */ -static List * +List * PostHeadersWithAuth(void) { return list_make3(psprintf("Authorization: Bearer %s", RestCatalogFetchAccessToken()), @@ -285,7 +456,7 @@ GetHeadersWithAuth(void) * "code": 400 * } */ -static void +void ReportHTTPError(HttpResult httpResult, int level) { /* @@ -611,3 +782,31 @@ GetRestCatalogName(Oid relationId) return get_database_name(MyDatabaseId); } + + + +static char * +AppendIcebergPartitionSpecForRestCatalogStage(List *partitionSpecs) +{ + StringInfo command = makeStringInfo(); + + ListCell *partitionSpecCell = NULL; + + foreach(partitionSpecCell, partitionSpecs) + { + IcebergPartitionSpec *spec = (IcebergPartitionSpec *) lfirst(partitionSpecCell); + + appendJsonKey(command, "spec"); + appendStringInfoString(command, "{"); + + /* append spec-id */ + appendJsonInt32(command, "spec-id", spec->spec_id); + + /* Append fields */ + appendStringInfoString(command, ", \"fields\":"); + AppendIcebergPartitionSpecFields(command, spec->fields, spec->fields_length); + + appendStringInfoString(command, "}"); + } + return command->data; +} diff --git a/pg_lake_table/include/pg_lake/transaction/track_iceberg_metadata_changes.h b/pg_lake_table/include/pg_lake/transaction/track_iceberg_metadata_changes.h index f3688b75..feb5917d 100644 --- a/pg_lake_table/include/pg_lake/transaction/track_iceberg_metadata_changes.h +++ b/pg_lake_table/include/pg_lake/transaction/track_iceberg_metadata_changes.h @@ -32,9 +32,22 @@ typedef struct TableMetadataOperationTracker bool relationSnapshotExpirationRequested; } TableMetadataOperationTracker; +typedef enum RestCatalogOperationType +{ + REST_CATALOG_CREATE_ICEBERG_TABLE = 0, + REST_CATALOG_ADD_SNAPSHOT = 1, + REST_CATALOG_ADD_SCHEMA = 2, + REST_CATALOG_ADD_PARTITION = 3, +} RestCatalogOperationType; + extern PGDLLEXPORT void ConsumeTrackedIcebergMetadataChanges(void); +extern PGDLLEXPORT void PostAllRestCatalogRequests(void); extern PGDLLEXPORT void TrackIcebergMetadataChangesInTx(Oid relationId, List *metadataOperationTypes); +extern PGDLLEXPORT void RecordRestCatalogRequestInTx(Oid relationId, RestCatalogOperationType operationType, + const char *catalogName, const char *catalogNamespace, + const char *catalogTableName, const char *body); extern PGDLLEXPORT void ResetTrackedIcebergMetadataOperation(void); +extern PGDLLEXPORT void ResetRestCatalogRequests(void); extern PGDLLEXPORT HTAB *GetTrackedIcebergMetadataOperations(void); extern PGDLLEXPORT bool HasAnyTrackedIcebergMetadataChanges(void); extern PGDLLEXPORT bool IsIcebergTableCreatedInCurrentTransaction(Oid relation); diff --git a/pg_lake_table/src/ddl/create_table.c b/pg_lake_table/src/ddl/create_table.c index 25727302..f538cf51 100644 --- a/pg_lake_table/src/ddl/create_table.c +++ b/pg_lake_table/src/ddl/create_table.c @@ -944,6 +944,29 @@ ProcessCreateIcebergTableFromForeignTableStmt(ProcessUtilityParams * params) */ Assert(list_length(GetRestrictedColumnDefList(columnDefList)) == list_length(columnDefList)); + + if (hasRestCatalogOption) + { + /* this code-path only deals with writable rest catalog tables */ + Assert(!HasReadOnlyOption(createStmt->options)); + + /* + * We have to start staging create table for writable rest catalog + * tables here, because initial staging allows us to get the vended + * credentials for this transaction. For example, if the rest catalog + * table is created via CTAS, the CTAS command may need to read/write + * data to S3 using the vended credentials. Also note that staging + * consists of two steps: 1. + * StartStagingCreateRestCatalogIcebergTable: which creates the table + * in the rest catalog with a "staging" status. 2. + * FinalizeStagingCreateRestCatalogIcebergTable: which finalizes the + * table creation in the rest catalog after the local table creation + * is successful in post-commit. + */ + StartStageRestCatalogIcebergTableCreate(relationId); + } + + List *ddlOps = NIL; IcebergDDLOperation *createDDLOp = palloc0(sizeof(IcebergDDLOperation)); diff --git a/pg_lake_table/src/ddl/ddl_changes.c b/pg_lake_table/src/ddl/ddl_changes.c index b9f192b7..c797c879 100644 --- a/pg_lake_table/src/ddl/ddl_changes.c +++ b/pg_lake_table/src/ddl/ddl_changes.c @@ -42,7 +42,8 @@ static void ApplyDDLCatalogChanges(Oid relationId, List *ddlOperations, List **droppedColumns, char **metadataLocation, - IcebergPartitionSpec * *partitionSpec); + IcebergPartitionSpec * *partitionSpec, + bool *isIcebergTableCreated); static bool DDLsRequireIcebergSchemaChange(List *ddlOperations); @@ -56,12 +57,11 @@ ApplyDDLChanges(Oid relationId, List *ddlOperations) List *droppedColumns = NIL; char *metadataLocation = NULL; + bool isIcebergTableCreated = false; IcebergPartitionSpec *partitionSpec = NULL; - ApplyDDLCatalogChanges(relationId, ddlOperations, &droppedColumns, &metadataLocation, &partitionSpec); - - bool isIcebergTableCreated = metadataLocation != NULL; + ApplyDDLCatalogChanges(relationId, ddlOperations, &droppedColumns, &metadataLocation, &partitionSpec, &isIcebergTableCreated); bool isPartitionByChange = partitionSpec != NULL; if (isIcebergTableCreated) @@ -90,8 +90,11 @@ ApplyDDLChanges(Oid relationId, List *ddlOperations) static void ApplyDDLCatalogChanges(Oid relationId, List *ddlOperations, List **droppedColumns, char **metadataLocation, - IcebergPartitionSpec * *partitionSpec) + IcebergPartitionSpec * *partitionSpec, + bool *isIcebergTableCreated) { + *isIcebergTableCreated = false; + ListCell *ddlOperationCell = NULL; foreach(ddlOperationCell, ddlOperations) @@ -107,8 +110,12 @@ ApplyDDLCatalogChanges(Oid relationId, List *ddlOperations, errmsg("cannot create the same iceberg table multiple times"))); } + *isIcebergTableCreated = true; + /* register the table */ - *metadataLocation = GenerateInitialIcebergTableMetadataPath(relationId); + IcebergCatalogType catalogType = GetIcebergCatalogType(relationId); + + *metadataLocation = catalogType == REST_CATALOG_READ_WRITE ? NULL : GenerateInitialIcebergTableMetadataPath(relationId); InsertInternalIcebergCatalogTable(relationId, *metadataLocation, ddlOperation->hasCustomLocation); /* register mappings for new columns */ @@ -121,6 +128,17 @@ ApplyDDLCatalogChanges(Oid relationId, List *ddlOperations, } else if (ddlOperation->type == DDL_TABLE_DROP) { + IcebergCatalogType catalogType = GetIcebergCatalogType(relationId); + + if (catalogType == REST_CATALOG_READ_WRITE) + { + /* + * TODO: Handle dropping of writable rest catalog iceberg + * tables here. + */ + return; + } + /* * This is not an expected case, either user manually messed with * the catalog or we have a bug. Still, we should not fail the diff --git a/pg_lake_table/src/ddl/vacuum.c b/pg_lake_table/src/ddl/vacuum.c index 0e3e8ab8..6551c922 100644 --- a/pg_lake_table/src/ddl/vacuum.c +++ b/pg_lake_table/src/ddl/vacuum.c @@ -1052,7 +1052,9 @@ GetMetadataLocationPrefixForRelationId(Oid relationId) } else { - char *metadataLocation = GetIcebergCatalogMetadataLocation(relationId, false); + IcebergCatalogType catalogType = GetIcebergCatalogType(relationId); + + char *metadataLocation = catalogType == REST_CATALOG_READ_WRITE ? GetMetadataLocationForRestCatalogForIcebergTable(relationId) : GetIcebergCatalogMetadataLocation(relationId, false); IcebergTableMetadata *metadata = ReadIcebergTableMetadata(metadataLocation); /* cast (const char *) to (char *) */ diff --git a/pg_lake_table/src/transaction/external_heavy_asserts.c b/pg_lake_table/src/transaction/external_heavy_asserts.c index a0cb453a..07c2b786 100644 --- a/pg_lake_table/src/transaction/external_heavy_asserts.c +++ b/pg_lake_table/src/transaction/external_heavy_asserts.c @@ -31,6 +31,7 @@ #include "pg_lake/fdw/snapshot.h" #include "pg_lake/fdw/schema_operations/register_field_ids.h" #include "pg_lake/parquet/leaf_field.h" +#include "pg_lake/rest_catalog/rest_catalog.h" #include "pg_lake/transaction/track_iceberg_metadata_changes.h" #include "pg_lake/transaction/transaction_hooks.h" @@ -102,6 +103,14 @@ ExternalHeavyAssertsOnIcebergMetadataChange(void) if (!RelationExistsInTheIcebergCatalog(relationId)) continue; + IcebergCatalogType catalogType = GetIcebergCatalogType(relationId); + + if (catalogType == REST_CATALOG_READ_WRITE) + { + /* TODO: should be trivial to extend */ + continue; + } + if (opTracker->relationDataFileChanged) { bool dataOnly = false; diff --git a/pg_lake_table/src/transaction/track_iceberg_metadata_changes.c b/pg_lake_table/src/transaction/track_iceberg_metadata_changes.c index 1ba5fb03..fcf601d0 100644 --- a/pg_lake_table/src/transaction/track_iceberg_metadata_changes.c +++ b/pg_lake_table/src/transaction/track_iceberg_metadata_changes.c @@ -32,14 +32,17 @@ #include "pg_lake/iceberg/operations/manifest_merge.h" #include "pg_lake/iceberg/partitioning/spec_generation.h" #include "pg_lake/partitioning/partition_spec_catalog.h" +#include "pg_lake/rest_catalog/rest_catalog.h" #include "pg_lake/transaction/track_iceberg_metadata_changes.h" #include "pg_lake/transaction/transaction_hooks.h" #include "pg_lake/util/injection_points.h" +#include "pg_lake/util/url_encode.h" static void ApplyTrackedIcebergMetadataChanges(void); static void RecordIcebergMetadataOperation(Oid relationId, TableMetadataOperationType operationType); static void InitTableMetadataTrackerHashIfNeeded(void); +static void InitRestCatalogRequestsHashIfNeeded(void); static HTAB *CreateDataFilesHashForMetadata(IcebergTableMetadata * metadata); static void FindChangedFilesSinceMetadata(HTAB *currentFilesMap, IcebergTableMetadata * metadata, List **addedFiles, List **removedFilePaths); @@ -53,11 +56,27 @@ static void DeleteInProgressAddedFiles(Oid relationId, List *addedFiles); static int ComparePartitionSpecsById(const ListCell *a, const ListCell *b); +typedef struct RestCatalogRequest +{ + Oid relationId; + + char *catalogName; + char *catalogNamespace; + char *catalogTableName; + + char *createTableBody; + char *addSnapshotBody; +} RestCatalogRequest; + /* * Hash table to track iceberg metadata operations per relation within a transaction. */ static HTAB *TrackedIcebergMetadataOperationsHash = NULL; +/* +* Hash table to track rest catalog requests per relation within a transaction. +*/ +static HTAB *RestCatalogRequestsHash = NULL; /* * TrackIcebergMetadataChangesInTx tracks metadata changes for a given relation @@ -146,6 +165,80 @@ ResetTrackedIcebergMetadataOperation(void) } +void +ResetRestCatalogRequests(void) +{ + RestCatalogRequestsHash = NULL; +} + +void +PostAllRestCatalogRequests(void) +{ + if (RestCatalogRequestsHash == NULL) + { + return; + } + + /* + * We need to iterate over the RestCatalogRequestsHash twice: 1. First, we + * need to post the create table requests to create the iceberg tables in + * the rest catalog. 2. Then, we need to post all the other modifications + * (like adding snapshots, partition specs, etc.) + * + * This is because the create table requests need to be completed before + * we can add snapshots to the tables. And, REST API does not support + * batching requests of create table and anything else. + */ + HASH_SEQ_STATUS status; + + hash_seq_init(&status, RestCatalogRequestsHash); + RestCatalogRequest *request; + + while ((request = hash_seq_search(&status)) != NULL) + { + if (request->createTableBody == NULL) + { + /* + * not a create table request + */ + continue; + } + + const char *url = + psprintf(REST_CATALOG_TABLE, + RestCatalogHost, + URLEncodePath(request->catalogName), + URLEncodePath(request->catalogNamespace), + URLEncodePath(request->catalogTableName)); + + HttpResult httpResult = HttpPost(url, request->createTableBody, PostHeadersWithAuth()); + + if (httpResult.status != 200) + { + ReportHTTPError(httpResult, WARNING); + + /* + * Ouch, something failed. Should we stop sending the requests? + */ + } + } + + /* + * Now that all create table requests have been posted, we can post all + * the other modifications. + */ + hash_seq_init(&status, RestCatalogRequestsHash); + + while ((request = hash_seq_search(&status)) != NULL) + { + /* + * TODO: implement other request types like adding snapshots, + * partition specs, etc. + */ + } +} + + /* * RecordIcebergMetadataOperation records a metadata operation for a relation. * This is used to track changes to the iceberg metadata during a transaction. @@ -230,6 +323,72 @@ InitTableMetadataTrackerHashIfNeeded(void) } } +/* + * InitTableMetadataTrackerHashIfNeeded is a helper function to manage the initialization + * of the hash. We allocate the hash and entries in TopTransactionContext. + */ +static void +InitRestCatalogRequestsHashIfNeeded(void) +{ + if (RestCatalogRequestsHash == NULL) + { + HASHCTL ctl; + + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(RestCatalogRequest); + ctl.hash = oid_hash; + ctl.hcxt = TopTransactionContext; + + RestCatalogRequestsHash = hash_create("Rest Catalog Requests", + 32, &ctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + } +} + + + +/* +* RecordRestCatalogRequestInTx records a REST catalog request to be sent at post-commit. +*/ +void +RecordRestCatalogRequestInTx(Oid relationId, RestCatalogOperationType operationType, + const char *catalogName, + const char *catalogNamespace, + const char *catalogTableName, + const char *body) +{ + InitRestCatalogRequestsHashIfNeeded(); + + bool isFound = false; + RestCatalogRequest *request = + hash_search(RestCatalogRequestsHash, + &relationId, HASH_ENTER, &isFound); + + if (!isFound) + { + memset(request, 0, sizeof(RestCatalogRequest)); + request->relationId = relationId; + + request->catalogName = MemoryContextStrdup(TopTransactionContext, catalogName); + request->catalogNamespace = MemoryContextStrdup(TopTransactionContext, catalogNamespace); + request->catalogTableName = MemoryContextStrdup(TopTransactionContext, catalogTableName); + } + + /* + * Always allocate in TopTransactionContext as we need this in + * post-commit. + */ + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); + + if (operationType == REST_CATALOG_CREATE_ICEBERG_TABLE) + { + request->createTableBody = pstrdup(body); + } + + MemoryContextSwitchTo(oldContext); +} + /* * ConsumeTrackedIcebergMetadataChanges consumes the tracked metadata operations and @@ -279,6 +438,23 @@ ApplyTrackedIcebergMetadataChanges(void) List *ddlOps = GetDDLMetadataOperations(opTracker); metadataOperations = list_concat(metadataOperations, ddlOps); + + if (opTracker->relationCreated && + GetIcebergCatalogType(relationId) == REST_CATALOG_READ_WRITE) + { + TableMetadataOperation *createOp = linitial(metadataOperations); + + char *body = + FinishStageRestCatalogIcebergTableCreateRestRequest(relationId, + createOp->schema, + createOp->partitionSpecs); + + RecordRestCatalogRequestInTx(relationId, REST_CATALOG_CREATE_ICEBERG_TABLE, + GetRestCatalogName(relationId), + GetRestCatalogNamespace(relationId), + GetRestCatalogTableName(relationId), + body); + } } /* apply all data file operations at once */ diff --git a/pg_lake_table/src/transaction/transaction_hooks.c b/pg_lake_table/src/transaction/transaction_hooks.c index 28c25f86..f3f91184 100644 --- a/pg_lake_table/src/transaction/transaction_hooks.c +++ b/pg_lake_table/src/transaction/transaction_hooks.c @@ -64,6 +64,7 @@ IcebergXactCallback(XactEvent event, void *arg) case XACT_EVENT_ABORT: { ResetTrackedIcebergMetadataOperation(); + ResetRestCatalogRequests(); break; } case XACT_EVENT_PRE_PREPARE: @@ -79,6 +80,8 @@ IcebergXactCallback(XactEvent event, void *arg) case XACT_EVENT_COMMIT: case XACT_EVENT_PARALLEL_COMMIT: { + PostAllRestCatalogRequests(); + ResetRestCatalogRequests(); break; } } diff --git a/pg_lake_table/tests/pytests/test_polaris_catalog.py b/pg_lake_table/tests/pytests/test_polaris_catalog.py index fe7464ed..e1a043fe 100644 --- a/pg_lake_table/tests/pytests/test_polaris_catalog.py +++ b/pg_lake_table/tests/pytests/test_polaris_catalog.py @@ -35,11 +35,42 @@ def test_polaris_catalog_running(pg_conn, s3, polaris_session, installcheck): assert resp.ok, f"Polaris is not running: {resp.status_code} {resp.text}" +def test_writable_rest_basic_flow( + pg_conn, s3, polaris_session, set_polaris_gucs, with_default_location, installcheck +): + + if installcheck: + return + + run_command(f"""CREATE SCHEMA test_writable_rest_basic_flow""", pg_conn) + run_command( + f"""CREATE TABLE test_writable_rest_basic_flow.writable_rest(a int) USING iceberg WITH (catalog='rest')""", + pg_conn, + ) + + pg_conn.commit() + + run_command( + f"""CREATE TABLE test_writable_rest_basic_flow.readable_rest() USING iceberg WITH (catalog='rest', read_only=True, catalog_table_name='writable_rest')""", + pg_conn, + ) + + columns = run_query( + "SELECT attname FROM pg_attribute WHERE attrelid = 'test_writable_rest_basic_flow.readable_rest'::regclass and attnum > 0", + pg_conn, + ) + assert len(columns) == 1 + assert columns[0][0] == "a" + + run_command(f"""DROP SCHEMA test_writable_rest_basic_flow CASCADE""", pg_conn) + pg_conn.commit() + + namespaces = [ "regular_name", - "regular ..!!**(());;//??::@@&&==++$$,,## name", + "regular..!!**(());;//??::@@&&==++$$,,#name", "Special-Table!_With.Multiple_Uses_Of@Chars#-Here~And*Here!name", - " !~*();/?:@&=+$,#", + "!~*();/?:@&=+$,#", ] @@ -67,6 +98,9 @@ def test_create_namespace( ) pg_conn.commit() + # no-op, just to make sure nothing is broken + run_command_outside_tx([f"""VACUUM "{namespace}".tbl"""], pg_conn) + encoded_namespace = run_query( f"SELECT lake_iceberg.url_encode_path('{namespace}')", pg_conn )[0][0] @@ -132,13 +166,13 @@ def test_create_namespace_in_tx( run_command(f'''CREATE SCHEMA "{namespace}_2"''', pg_conn) run_command( - f"""CREATE TABLE "{namespace}".tbl(a int) USING iceberg WITH (catalog='rest');""", + f"""CREATE TABLE "{namespace}".tbl_10(a int) USING iceberg WITH (catalog='rest');""", pg_conn, ) pg_conn.commit() run_command( - f"""CREATE TABLE "{namespace}_2".tbl(a int) USING iceberg WITH (catalog='rest');""", + f"""CREATE TABLE "{namespace}_2".tbl_11(a int) USING iceberg WITH (catalog='rest');""", pg_conn, ) pg_conn.commit() @@ -176,7 +210,7 @@ def test_create_namespace_rollback( run_command(f'''CREATE SCHEMA "{namespace}"''', pg_conn) run_command( - f"""CREATE TABLE "{namespace}".tbl(a int) USING iceberg WITH (catalog='rest');""", + f"""CREATE TABLE "{namespace}".tbl_20(a int) USING iceberg WITH (catalog='rest');""", pg_conn, )