diff --git a/pg_lake_iceberg/include/pg_lake/iceberg/metadata_operations.h b/pg_lake_iceberg/include/pg_lake/iceberg/metadata_operations.h index 0e3949a4..a56fd6b8 100644 --- a/pg_lake_iceberg/include/pg_lake/iceberg/metadata_operations.h +++ b/pg_lake_iceberg/include/pg_lake/iceberg/metadata_operations.h @@ -19,6 +19,6 @@ #include "nodes/pg_list.h" -extern PGDLLEXPORT void ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allTransforms, bool isVerbose); +extern PGDLLEXPORT List *ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allTransforms, bool isVerbose); extern PGDLLEXPORT bool ShouldSkipMetadataChangeToIceberg(List *metadataOperationTypes); extern PGDLLEXPORT List *GetMetadataOperationTypes(List *metadataOperations); diff --git a/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h b/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h index 30ad0cd9..6b6f8395 100644 --- a/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h +++ b/pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h @@ -21,6 +21,7 @@ #include "pg_lake/http/http_client.h" #include "pg_lake/util/rel_utils.h" #include "pg_lake/parquet/field.h" +#include "pg_lake/iceberg/api/snapshot.h" extern PGDLLEXPORT char *RestCatalogHost; extern char *RestCatalogClientId; @@ -36,6 +37,25 @@ extern char *RestCatalogClientSecret; #define REST_CATALOG_AUTH_TOKEN_PATH "%s/api/catalog/v1/oauth/tokens" +#define REST_CATALOG_TRANSACTION_COMMIT "%s/api/catalog/v1/%s/transactions/commit" + +typedef enum RestCatalogOperationType +{ + REST_CATALOG_CREATE_TABLE = 0, + REST_CATALOG_ADD_SNAPSHOT = 1, + REST_CATALOG_ADD_SCHEMA = 2, + REST_CATALOG_ADD_PARTITION = 3, +} RestCatalogOperationType; + + +typedef struct RestCatalogRequest +{ + Oid relationId; + RestCatalogOperationType operationType; + + char *body; +} RestCatalogRequest; + extern PGDLLEXPORT char *RestCatalogFetchAccessToken(void); extern PGDLLEXPORT void StartStageRestCatalogIcebergTableCreate(Oid relationId); extern PGDLLEXPORT char *FinishStageRestCatalogIcebergTableCreateRestRequest(Oid relationId, DataFileSchema * dataFileSchema, List *partitionSpecs); @@ -54,3 +74,4 @@ extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(const char *restCata extern PGDLLEXPORT char *GetMetadataLocationForRestCatalogForIcebergTable(Oid relationId); extern PGDLLEXPORT void ReportHTTPError(HttpResult httpResult, int level); extern PGDLLEXPORT List *PostHeadersWithAuth(void); +extern PGDLLEXPORT RestCatalogRequest * GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId); diff --git a/pg_lake_iceberg/src/iceberg/metadata_operations.c b/pg_lake_iceberg/src/iceberg/metadata_operations.c index 014df08d..25fae0bf 100644 --- a/pg_lake_iceberg/src/iceberg/metadata_operations.c +++ b/pg_lake_iceberg/src/iceberg/metadata_operations.c @@ -146,9 +146,11 @@ static void DeleteInProgressManifests(Oid relationId, List *manifests); * ApplyIcebergMetadataChanges applies the given metadata operations to the * iceberg metadata for the given relation. */ -void +List * ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allTransforms, bool isVerbose) { + List *restCatalogRequests = NIL; + Assert(metadataOperations != NIL); #ifdef USE_ASSERT_CHECKING @@ -244,10 +246,13 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT createNewSnapshot = true; - /* - * TODO: Create RestCatalogRequest for setting the current_snapshot_id - * in the writable rest catalog iceberg table. - */ + if (writableRestCatalogTable) + { + RestCatalogRequest *request = + GetAddSnapshotCatalogRequest(newSnapshot, relationId); + + restCatalogRequests = lappend(restCatalogRequests, request); + } } /* if we need to expire old snapshots, we do it here */ @@ -269,24 +274,18 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT /* if there were no changes to the Iceberg table, we are done */ if (!createNewSnapshot && !createNewTable) - return; + { + Assert(restCatalogRequests == NIL); + return restCatalogRequests; + } if (writableRestCatalogTable) { - - if (createNewSnapshot) - { - /* - * TODO: Create RestCatalogRequest for adding the new snapshot in - * the writable rest catalog iceberg table. - */ - } - /* * We are done, writable rest catalog iceberg tables have their * metadata updated in the catalog itself. */ - return; + return restCatalogRequests; } /* add the new snapshot to the snapshot log */ @@ -327,6 +326,13 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT } TriggerCatalogExportIfObjectStoreTable(relationId); + + /* + * for a non-writable rest table, we should not have any rest catalog + * requests + */ + Assert(restCatalogRequests == NIL); + return restCatalogRequests; } diff --git a/pg_lake_iceberg/src/rest_catalog/rest_catalog.c b/pg_lake_iceberg/src/rest_catalog/rest_catalog.c index 33d7bff6..fa8032e3 100644 --- a/pg_lake_iceberg/src/rest_catalog/rest_catalog.c +++ b/pg_lake_iceberg/src/rest_catalog/rest_catalog.c @@ -37,6 +37,7 @@ #include "pg_lake/parsetree/options.h" #include "pg_lake/util/url_encode.h" #include "pg_lake/util/rel_utils.h" +#include "pg_lake/iceberg/temporal_utils.h" /* determined by GUC */ @@ -810,3 +811,43 @@ AppendIcebergPartitionSpecForRestCatalogStage(List *partitionSpecs) } return command->data; } + + + +/* +* GetAddSnapshotCatalogRequest creates a RestCatalogRequest to add a snapshot +* to the rest catalog for the given new snapshot. +*/ +RestCatalogRequest * +GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId) +{ + StringInfo body = makeStringInfo(); + + appendStringInfoString(body, + "{\"action\":\"add-snapshot\",\"snapshot\":{"); + + appendStringInfo(body, "\"snapshot-id\":%lld", newSnapshot->snapshot_id); + if (newSnapshot->parent_snapshot_id > 0) + appendStringInfo(body, ",\"parent-snapshot-id\":%lld", newSnapshot->parent_snapshot_id); + + /* TODO: Polaris doesn't accept 0 sequence number */ + appendStringInfo(body, ",\"sequence-number\":%lld", newSnapshot->sequence_number == 0 ? 1 : newSnapshot->sequence_number); + appendStringInfo(body, ",\"timestamp-ms\":%ld", (long) (PostgresTimestampToIcebergTimestampMs())); /* coarse ms */ + appendStringInfo(body, ",\"manifest-list\":\"%s\"", newSnapshot->manifest_list); + appendStringInfoString(body, ",\"summary\":{\"operation\": \"append\"}"); + + if (newSnapshot->schema_id > 0) + appendStringInfo(body, ",\"schema-id\":%d", newSnapshot->schema_id); + + appendStringInfoString(body, "}}, "); /* end add-snapshot */ + + appendStringInfo(body, "{\"action\":\"set-snapshot-ref\", \"type\":\"branch\", \"ref-name\":\"main\", \"snapshot-id\":%lld}", newSnapshot->snapshot_id); + + RestCatalogRequest *request = palloc0(sizeof(RestCatalogRequest)); + + request->relationId = relationId; + request->operationType = REST_CATALOG_ADD_SNAPSHOT; + request->body = body->data; + + return request; +} diff --git a/pg_lake_table/include/pg_lake/transaction/track_iceberg_metadata_changes.h b/pg_lake_table/include/pg_lake/transaction/track_iceberg_metadata_changes.h index feb5917d..ca39546a 100644 --- a/pg_lake_table/include/pg_lake/transaction/track_iceberg_metadata_changes.h +++ b/pg_lake_table/include/pg_lake/transaction/track_iceberg_metadata_changes.h @@ -19,6 +19,7 @@ #include "postgres.h" #include "access/hash.h" +#include "pg_lake/rest_catalog/rest_catalog.h" typedef struct TableMetadataOperationTracker { @@ -32,20 +33,12 @@ typedef struct TableMetadataOperationTracker bool relationSnapshotExpirationRequested; } TableMetadataOperationTracker; -typedef enum RestCatalogOperationType -{ - REST_CATALOG_CREATE_ICEBERG_TABLE = 0, - REST_CATALOG_ADD_SNAPSHOT = 1, - REST_CATALOG_ADD_SCHEMA = 2, - REST_CATALOG_ADD_PARTITION = 3, -} RestCatalogOperationType; extern PGDLLEXPORT void ConsumeTrackedIcebergMetadataChanges(void); extern PGDLLEXPORT void PostAllRestCatalogRequests(void); extern PGDLLEXPORT void TrackIcebergMetadataChangesInTx(Oid relationId, List *metadataOperationTypes); extern PGDLLEXPORT void RecordRestCatalogRequestInTx(Oid relationId, RestCatalogOperationType operationType, - const char *catalogName, const char *catalogNamespace, - const char *catalogTableName, const char *body); + const char *body); extern PGDLLEXPORT void ResetTrackedIcebergMetadataOperation(void); extern PGDLLEXPORT void ResetRestCatalogRequests(void); extern PGDLLEXPORT HTAB *GetTrackedIcebergMetadataOperations(void); diff --git a/pg_lake_table/src/transaction/track_iceberg_metadata_changes.c b/pg_lake_table/src/transaction/track_iceberg_metadata_changes.c index fcf601d0..e912bbd7 100644 --- a/pg_lake_table/src/transaction/track_iceberg_metadata_changes.c +++ b/pg_lake_table/src/transaction/track_iceberg_metadata_changes.c @@ -36,8 +36,20 @@ #include "pg_lake/transaction/track_iceberg_metadata_changes.h" #include "pg_lake/transaction/transaction_hooks.h" #include "pg_lake/util/injection_points.h" +#include "pg_lake/json/json_utils.h" #include "pg_lake/util/url_encode.h" +typedef struct RestCatalogRequestPerTable +{ + Oid relationId; + + char *catalogName; + char *catalogNamespace; + char *catalogTableName; + + List *createTableRequests; + List *tableModifyRequests; +} RestCatalogRequestPerTable; static void ApplyTrackedIcebergMetadataChanges(void); static void RecordIcebergMetadataOperation(Oid relationId, TableMetadataOperationType operationType); @@ -55,18 +67,9 @@ static List *GetDDLMetadataOperations(const TableMetadataOperationTracker * opTr static void DeleteInProgressAddedFiles(Oid relationId, List *addedFiles); static int ComparePartitionSpecsById(const ListCell *a, const ListCell *b); +static char *IdentifierJson(const char *namespaceFlat, const char *tableName); -typedef struct RestCatalogRequest -{ - Oid relationId; - - char *catalogName; - char *catalogNamespace; - char *catalogTableName; - char *createTableBody; - char *addSnapshotBody; -} RestCatalogRequest; /* * Hash table to track iceberg metadata operations per relation within a transaction. @@ -192,53 +195,136 @@ PostAllRestCatalogRequests(void) HASH_SEQ_STATUS status; hash_seq_init(&status, RestCatalogRequestsHash); - RestCatalogRequest *request; + RestCatalogRequestPerTable *requestPerTable = NULL; - while ((request = hash_seq_search(&status)) != NULL) + while ((requestPerTable = hash_seq_search(&status)) != NULL) { - if (request->createTableBody == NULL) + ListCell *requestCell = NULL; + + foreach(requestCell, requestPerTable->createTableRequests) { - /* - * not a create table request - */ - continue; - } + RestCatalogRequest *request = (RestCatalogRequest *) lfirst(requestCell); - const char *url = - psprintf(REST_CATALOG_TABLE, - RestCatalogHost, - URLEncodePath(request->catalogName), - URLEncodePath(request->catalogNamespace), - URLEncodePath(request->catalogTableName)); + Assert(request->operationType == REST_CATALOG_CREATE_TABLE); - HttpResult httpResult = HttpPost(url, request->createTableBody, PostHeadersWithAuth()); + const char *url = + psprintf(REST_CATALOG_TABLE, + RestCatalogHost, + URLEncodePath(requestPerTable->catalogName), + URLEncodePath(requestPerTable->catalogNamespace), + URLEncodePath(requestPerTable->catalogTableName)); - if (httpResult.status != 200) - { - ReportHTTPError(httpResult, WARNING); + HttpResult httpResult = HttpPost(url, request->body, PostHeadersWithAuth()); - /* - * Ouch, something failed. Should we stop sending the requests? - */ + if (httpResult.status != 200) + { + ReportHTTPError(httpResult, WARNING); + + /* + * Ouch, something failed. Should we stop sending the + * requests? + */ + } } } /* * Now that all create table requests have been posted, we can post all - * the other modifications. + * the other modifications. All table modifications are sent in a single + * HTTP request to ensure atomicity. */ + char *catalogName = NULL; + bool hasRestCatalogChanges = false; + StringInfo batchRequestBody = makeStringInfo(); + + appendStringInfo(batchRequestBody, "{"); /* start msg body */ + appendJsonKey(batchRequestBody, "table-changes"); + appendStringInfo(batchRequestBody, "["); /* start array of changes */ + hash_seq_init(&status, RestCatalogRequestsHash); - while ((request = hash_seq_search(&status)) != NULL) + while ((requestPerTable = hash_seq_search(&status)) != NULL) { + /* TODO: can we ever have multiple catalogs? */ + catalogName = requestPerTable->catalogName; + if (requestPerTable->tableModifyRequests == NIL) + { + /* + * no modifications to send for this table + */ + continue; + } + + if (hasRestCatalogChanges) + { + appendStringInfoChar(batchRequestBody, ','); /* separate previous + * table change */ + } + + appendStringInfoChar(batchRequestBody, '{'); /* start per-table json + * object */ + appendJsonKey(batchRequestBody, "identifier"); + appendStringInfo(batchRequestBody, "%s", IdentifierJson(requestPerTable->catalogNamespace, requestPerTable->catalogTableName)); + appendStringInfoChar(batchRequestBody, ','); + appendStringInfoString(batchRequestBody, "\"requirements\":[],"); + appendStringInfoString(batchRequestBody, " \"updates\":["); + + + ListCell *requestCell = NULL; + + foreach(requestCell, requestPerTable->tableModifyRequests) + { + RestCatalogRequest *request = (RestCatalogRequest *) lfirst(requestCell); + + appendStringInfoString(batchRequestBody, request->body); + + } + + appendStringInfoChar(batchRequestBody, ']'); /* close updates array */ + appendStringInfoChar(batchRequestBody, '}'); /* close per-table json + * object */ + /* - * TODO: implement other request types like adding snapshots, - * partition specs, etc. + * We have at least one change to send for this table */ + hasRestCatalogChanges = true; + } + + if (hasRestCatalogChanges) + { + appendStringInfoChar(batchRequestBody, ']'); /* close table-changes */ + appendStringInfoChar(batchRequestBody, '}'); /* close json body */ + + char *url = psprintf(REST_CATALOG_TRANSACTION_COMMIT, RestCatalogHost, catalogName); + HttpResult httpResult = HttpPost(url, batchRequestBody->data, PostHeadersWithAuth()); + + if (httpResult.status != 204) + { + ReportHTTPError(httpResult, WARNING); + } } } +/* + * IdentifierJson creates a JSON representation of an iceberg table identifier + * given its namespace and table name. + */ +static char * +IdentifierJson(const char *namespaceFlat, const char *tableName) +{ + StringInfoData out; + + initStringInfo(&out); + appendStringInfoChar(&out, '{'); + appendStringInfoString(&out, "\"namespace\":"); + appendStringInfo(&out, "[\"%s\"]", namespaceFlat); + appendStringInfoString(&out, ",\"name\":"); + appendStringInfo(&out, "\"%s\"", tableName); + appendStringInfoChar(&out, '}'); + return out.data; +} + /* * RecordIcebergMetadataOperation records a metadata operation for a relation. * This is used to track changes to the iceberg metadata during a transaction. @@ -336,7 +422,7 @@ InitRestCatalogRequestsHashIfNeeded(void) MemSet(&ctl, 0, sizeof(ctl)); ctl.keysize = sizeof(Oid); - ctl.entrysize = sizeof(RestCatalogRequest); + ctl.entrysize = sizeof(RestCatalogRequestPerTable); ctl.hash = oid_hash; ctl.hcxt = TopTransactionContext; @@ -347,32 +433,28 @@ InitRestCatalogRequestsHashIfNeeded(void) } - /* * RecordRestCatalogRequestInTx records a REST catalog request to be sent at post-commit. */ void RecordRestCatalogRequestInTx(Oid relationId, RestCatalogOperationType operationType, - const char *catalogName, - const char *catalogNamespace, - const char *catalogTableName, const char *body) { InitRestCatalogRequestsHashIfNeeded(); bool isFound = false; - RestCatalogRequest *request = + RestCatalogRequestPerTable *requestPerTable = hash_search(RestCatalogRequestsHash, &relationId, HASH_ENTER, &isFound); if (!isFound) { - memset(request, 0, sizeof(RestCatalogRequest)); - request->relationId = relationId; + memset(requestPerTable, 0, sizeof(RestCatalogRequestPerTable)); + requestPerTable->relationId = relationId; - request->catalogName = MemoryContextStrdup(TopTransactionContext, catalogName); - request->catalogNamespace = MemoryContextStrdup(TopTransactionContext, catalogNamespace); - request->catalogTableName = MemoryContextStrdup(TopTransactionContext, catalogTableName); + requestPerTable->catalogName = MemoryContextStrdup(TopTransactionContext, GetRestCatalogName(relationId)); + requestPerTable->catalogNamespace = MemoryContextStrdup(TopTransactionContext, GetRestCatalogNamespace(relationId)); + requestPerTable->catalogTableName = MemoryContextStrdup(TopTransactionContext, GetRestCatalogTableName(relationId)); } /* @@ -381,9 +463,19 @@ RecordRestCatalogRequestInTx(Oid relationId, RestCatalogOperationType operationT */ MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); - if (operationType == REST_CATALOG_CREATE_ICEBERG_TABLE) + RestCatalogRequest *request = palloc0(sizeof(RestCatalogRequest)); + + request->operationType = operationType; + + if (operationType == REST_CATALOG_CREATE_TABLE) + { + request->body = pstrdup(body); + requestPerTable->createTableRequests = list_make1(request); + } + else if (operationType == REST_CATALOG_ADD_SNAPSHOT) { - request->createTableBody = pstrdup(body); + request->body = pstrdup(body); + requestPerTable->tableModifyRequests = lappend(requestPerTable->tableModifyRequests, request); } MemoryContextSwitchTo(oldContext); @@ -449,11 +541,7 @@ ApplyTrackedIcebergMetadataChanges(void) createOp->schema, createOp->partitionSpecs); - RecordRestCatalogRequestInTx(relationId, REST_CATALOG_CREATE_ICEBERG_TABLE, - GetRestCatalogName(relationId), - GetRestCatalogNamespace(relationId), - GetRestCatalogTableName(relationId), - body); + RecordRestCatalogRequestInTx(relationId, REST_CATALOG_CREATE_TABLE, body); } } @@ -486,7 +574,19 @@ ApplyTrackedIcebergMetadataChanges(void) } if (metadataOperations != NIL) - ApplyIcebergMetadataChanges(relationId, metadataOperations, allTransforms, true); + { + List *restRequests = ApplyIcebergMetadataChanges(relationId, metadataOperations, allTransforms, true); + ListCell *requestCell = NULL; + + foreach(requestCell, restRequests) + { + RestCatalogRequest *request = lfirst(requestCell); + + RecordRestCatalogRequestInTx(relationId, request->operationType, + request->body); + } + + } } INJECTION_POINT_COMPAT("after-apply-iceberg-changes"); @@ -663,7 +763,11 @@ GetLastPushedIcebergMetadata(const TableMetadataOperationTracker * opTracker) return NULL; /* read the most recently pushed iceberg metadata for the table */ - char *metadataPath = GetIcebergCatalogMetadataLocation(opTracker->relationId, false); + IcebergCatalogType catalogType = GetIcebergCatalogType(opTracker->relationId); + + + char *metadataPath = + catalogType == REST_CATALOG_READ_WRITE ? GetMetadataLocationForRestCatalogForIcebergTable(opTracker->relationId) : GetIcebergCatalogMetadataLocation(opTracker->relationId, false); return ReadIcebergTableMetadata(metadataPath); } diff --git a/pg_lake_table/tests/pytests/test_polaris_catalog.py b/pg_lake_table/tests/pytests/test_polaris_catalog.py index e1a043fe..49866c48 100644 --- a/pg_lake_table/tests/pytests/test_polaris_catalog.py +++ b/pg_lake_table/tests/pytests/test_polaris_catalog.py @@ -35,6 +35,7 @@ def test_polaris_catalog_running(pg_conn, s3, polaris_session, installcheck): assert resp.ok, f"Polaris is not running: {resp.status_code} {resp.text}" +# fetch_data_files_used def test_writable_rest_basic_flow( pg_conn, s3, polaris_session, set_polaris_gucs, with_default_location, installcheck ): @@ -44,7 +45,16 @@ def test_writable_rest_basic_flow( run_command(f"""CREATE SCHEMA test_writable_rest_basic_flow""", pg_conn) run_command( - f"""CREATE TABLE test_writable_rest_basic_flow.writable_rest(a int) USING iceberg WITH (catalog='rest')""", + f"""CREATE TABLE test_writable_rest_basic_flow.writable_rest USING iceberg WITH (catalog='rest') AS SELECT 100 AS a""", + pg_conn, + ) + run_command( + f"""CREATE TABLE test_writable_rest_basic_flow.writable_rest_2 USING iceberg WITH (catalog='rest') AS SELECT 1000 AS a""", + pg_conn, + ) + + run_command( + f"""CREATE TABLE test_writable_rest_basic_flow.unrelated_table(a int) USING iceberg""", pg_conn, ) @@ -55,6 +65,11 @@ def test_writable_rest_basic_flow( pg_conn, ) + run_command( + f"""CREATE TABLE test_writable_rest_basic_flow.readable_rest_2() USING iceberg WITH (catalog='rest', read_only=True, catalog_table_name='writable_rest_2')""", + pg_conn, + ) + columns = run_query( "SELECT attname FROM pg_attribute WHERE attrelid = 'test_writable_rest_basic_flow.readable_rest'::regclass and attnum > 0", pg_conn, @@ -62,6 +77,69 @@ def test_writable_rest_basic_flow( assert len(columns) == 1 assert columns[0][0] == "a" + columns = run_query( + "SELECT attname FROM pg_attribute WHERE attrelid = 'test_writable_rest_basic_flow.readable_rest_2'::regclass and attnum > 0", + pg_conn, + ) + assert len(columns) == 1 + assert columns[0][0] == "a" + + run_command( + f"""INSERT INTO test_writable_rest_basic_flow.writable_rest VALUES (101)""", + pg_conn, + ) + + run_command( + f"""INSERT INTO test_writable_rest_basic_flow.writable_rest_2 VALUES (1001)""", + pg_conn, + ) + pg_conn.commit() + + res = run_query( + "SELECT * FROM test_writable_rest_basic_flow.readable_rest ORDER BY a ASC", + pg_conn, + ) + assert len(res) == 2 + assert res[0][0] == 100 + assert res[1][0] == 101 + + res = run_query( + "SELECT * FROM test_writable_rest_basic_flow.readable_rest_2 ORDER BY a ASC", + pg_conn, + ) + assert len(res) == 2 + assert res[0][0] == 1000 + assert res[1][0] == 1001 + + # now, each table modified twice in the same tx + run_command( + f""" + INSERT INTO test_writable_rest_basic_flow.writable_rest VALUES (102); + INSERT INTO test_writable_rest_basic_flow.writable_rest VALUES (103); + + INSERT INTO test_writable_rest_basic_flow.writable_rest_2 VALUES (1002); + INSERT INTO test_writable_rest_basic_flow.writable_rest_2 VALUES (1003); + + INSERT INTO test_writable_rest_basic_flow.unrelated_table VALUES (2000); + """, + pg_conn, + ) + pg_conn.commit() + + res = run_query( + "SELECT * FROM test_writable_rest_basic_flow.readable_rest ORDER BY 1 ASC", + pg_conn, + ) + assert len(res) == 4 + assert res == [[100], [101], [102], [103]] + + res = run_query( + "SELECT * FROM test_writable_rest_basic_flow.readable_rest_2 ORDER BY 1 ASC", + pg_conn, + ) + assert len(res) == 4 + assert res == [[1000], [1001], [1002], [1003]] + run_command(f"""DROP SCHEMA test_writable_rest_basic_flow CASCADE""", pg_conn) pg_conn.commit()