Skip to content

Commit 34f723d

Browse files
Writable Iceberg tables with REST catalog support (#68)
In this PR, we add support for writable REST catalog tables. The API may still change, so I refrain documenting it in the PR description. Once we finalise the APIs, the best way would be to put under https://github.com/Snowflake-Labs/pg_lake/blob/main/docs/iceberg-tables.md, so stay tuned for that. In the earlier set of PRs (#47, #49, #51, #52 and #56) we prototyped adding support for writable rest catalog in different stages. However, it turns out that a single PR is better to tackle these very much related commits. It seemed overkill to maintain these set of PRs. This new table type shares almost the same architecture with iceberg tables `catalog=postgres`. #### Similarities - Tables are tracked the `pg_lake` catalogs such as `lake_table.files`, `lake_table.data_file_column_stats` and `lake_iceberg.tables_internal`. - All metadata handling follows `ApplyIcebergMetadataChanges()` logic. Instead of generating a new `metadata.json` as we do for `catalog=postgres`, for these tables we collect the changes happened in the transaction, and apply to the REST catalog right after it is committed in Postgres. #### Differences - The `metadata_location` column in `lake_iceberg.tables_internal` is always `NULL` - Does not support RENAME TABLE / SET SCHEMA etc. #### Some other notes on the implementation & design: - We first `COMMIT` in Postgres, then in `post-commit` hook, send a `POST` request to REST catalog. So, it is possible that the changes are committed in Postgres, but not in REST catalog. This is a known limitation, and we'll have follow-up PRs to make sure we can recover from this situation. - Creating a table and modifying it in the same Postgres transaction cannot be committed atomically in REST catalog. There is no such API in REST catalog. So, there are some additional error scenarios where table creation committed in REST catalog, say not the full CTAS. This is an unfortunate limitation that we inherit from REST catalog APIs. - Our implementation currently assumes that the Postgres is the single-writer to this table in the REST catalog. So, a concurrent modification breaks the table from Postgres side. For now, this is the current state. We plan to improve it in the future. #### TODO: - [x] `DROP partition_by` is not working (fixed by #79) - [x] Concurrency - [x] Certain DDLs do not work (e.g., ADD COLUMN with defaults), prevent much earlier - [x] VACUUM regression tests - [x] VACUUM failures (e.g., do we clean up properly?) - [x] VACUUM (ICEBERG) - [x] auto-vacuum test - [x] Truncate test - [x] savepoint - [x] Complex TX test - [x] Column names with quotes - [x] Add column + add partition by + drop column in the same tx - [x] Tests for read from postgres / iceberg, modify REST (or the other way around) - [x] Zero column table? - [x] DROP TABLE implemented, but needs tests (e.g., create - drop in the same tx, drop table removes the metadata from rest catalog etc). - [x] `SET partition_by` to an already existing partition by is not supported in Polaris. We should skip sending such requests, instead only send `set partition_spec` alone. (fixed by #79) - [ ] Recovery after failures (e.g., re-sync the previous snapshot/DDL) [Follow-up PR needed] - [x] Cache access token, currently we fetch on every REST request interaction [Follow-up PR needed] - [x] Cancel query - [x] sequences / serial / generated columns etc. - [x] All data types - [ ] Docs [Follow-up PR needed]
1 parent fa0a562 commit 34f723d

24 files changed

+6802
-124
lines changed

pg_lake_iceberg/include/pg_lake/iceberg/api/table_metadata.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ extern PGDLLEXPORT char *GenerateRemoteMetadataFilePath(int version, const char
3535
extern PGDLLEXPORT void UploadTableMetadataToURI(IcebergTableMetadata * tableMetadata, char *metadataURI);
3636
extern PGDLLEXPORT void AdjustAndRetainMetadataLogs(IcebergTableMetadata * metadata, char *prevMetadataPath, size_t snapshotLogLength, int64_t prev_last_updated_ms);
3737
extern PGDLLEXPORT void UpdateLatestSnapshot(IcebergTableMetadata * tableMetadata, IcebergSnapshot * newSnapshot);
38-
extern PGDLLEXPORT bool RemoveOldSnapshotsFromMetadata(Oid relationId, IcebergTableMetadata * metadata, bool isVerbose);
38+
extern PGDLLEXPORT List *RemoveOldSnapshotsFromMetadata(Oid relationId, IcebergTableMetadata * metadata, bool isVerbose);
3939
extern PGDLLEXPORT void GenerateSnapshotLogEntries(IcebergTableMetadata * metadata);
4040
extern PGDLLEXPORT int FindLargestPartitionFieldId(IcebergPartitionSpec * newSpec);
4141
extern PGDLLEXPORT void AppendCurrentPostgresSchema(Oid relationId, IcebergTableMetadata * metadata,

pg_lake_iceberg/include/pg_lake/iceberg/catalog.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ extern PGDLLEXPORT void DeleteInternalIcebergCatalogTable(Oid relationId);
5555

5656
extern PGDLLEXPORT List *GetAllInternalIcebergRelationIds(void);
5757
extern PGDLLEXPORT char *GetIcebergMetadataLocation(Oid relationId, bool forUpdate);
58+
extern PGDLLEXPORT void LockIcebergPgLakeCatalogForUpdate(Oid relationId);
59+
extern PGDLLEXPORT char *GetIcebergCatalogMetadataLocation(Oid relationId, bool forUpdate);
5860
extern PGDLLEXPORT char *GetIcebergCatalogPreviousMetadataLocation(Oid relationId, bool forUpdate);
5961
extern PGDLLEXPORT void UpdateExternalCatalogMetadataLocation(char *catalogName, char *schemaName, char *tableName, const char *metadataLocation,
6062
const char *previousMetadataLocation);

pg_lake_iceberg/include/pg_lake/iceberg/metadata_operations.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,6 @@
1919

2020
#include "nodes/pg_list.h"
2121

22-
extern PGDLLEXPORT void ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allTransforms, bool isVerbose);
22+
extern PGDLLEXPORT List *ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allTransforms, bool isVerbose);
2323
extern PGDLLEXPORT bool ShouldSkipMetadataChangeToIceberg(List *metadataOperationTypes);
2424
extern PGDLLEXPORT List *GetMetadataOperationTypes(List *metadataOperations);

pg_lake_iceberg/include/pg_lake/iceberg/metadata_spec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,3 +290,5 @@ typedef struct IcebergTableMetadata
290290

291291
extern PGDLLEXPORT IcebergTableMetadata * ReadIcebergTableMetadata(const char *tableMetadataPath);
292292
extern PGDLLEXPORT char *WriteIcebergTableMetadataToJson(IcebergTableMetadata * metadata);
293+
extern PGDLLEXPORT void AppendIcebergTableSchemaForRestCatalog(StringInfo command, IcebergTableSchema * schemas, size_t schemas_length);
294+
extern PGDLLEXPORT void AppendIcebergPartitionSpecFields(StringInfo command, IcebergPartitionSpecField * fields, size_t fields_length);

pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
#include "postgres.h"
2121
#include "pg_lake/http/http_client.h"
2222
#include "pg_lake/util/rel_utils.h"
23+
#include "pg_lake/parquet/field.h"
24+
#include "pg_lake/iceberg/api/snapshot.h"
2325

24-
extern char *RestCatalogHost;
26+
extern PGDLLEXPORT char *RestCatalogHost;
2527
extern char *RestCatalogClientId;
2628
extern char *RestCatalogClientSecret;
2729

@@ -30,11 +32,43 @@ extern char *RestCatalogClientSecret;
3032
#define REST_CATALOG_NAMESPACE_NAME "%s/api/catalog/v1/%s/namespaces/%s"
3133
#define REST_CATALOG_NAMESPACE "%s/api/catalog/v1/%s/namespaces"
3234

35+
#define REST_CATALOG_TABLE "%s/api/catalog/v1/%s/namespaces/%s/tables/%s"
36+
#define REST_CATALOG_TABLES "%s/api/catalog/v1/%s/namespaces/%s/tables"
37+
3338
#define REST_CATALOG_AUTH_TOKEN_PATH "%s/api/catalog/v1/oauth/tokens"
34-
#define GET_REST_CATALOG_METADATA_LOCATION "%s/api/catalog/v1/%s/namespaces/%s/tables/%s"
3539

36-
extern PGDLLEXPORT char *RestCatalogFetchAccessToken(void);
40+
#define REST_CATALOG_TRANSACTION_COMMIT "%s/api/catalog/v1/%s/transactions/commit"
41+
42+
typedef enum RestCatalogOperationType
43+
{
44+
REST_CATALOG_CREATE_TABLE = 0,
45+
REST_CATALOG_ADD_SNAPSHOT = 1,
46+
REST_CATALOG_ADD_SCHEMA = 2,
47+
REST_CATALOG_SET_CURRENT_SCHEMA = 3,
48+
REST_CATALOG_ADD_PARTITION = 4,
49+
REST_CATALOG_REMOVE_SNAPSHOT = 5,
50+
REST_CATALOG_DROP_TABLE = 6,
51+
REST_CATALOG_SET_DEFAULT_PARTITION_ID = 7,
52+
} RestCatalogOperationType;
53+
54+
55+
typedef struct RestCatalogRequest
56+
{
57+
Oid relationId;
58+
RestCatalogOperationType operationType;
59+
60+
/*
61+
* For each request, holds the "action" part of the request body. We
62+
* concatenate all requests from multiple tables into a single transaction
63+
* commit request. The only exception is CREATE/DROP table, where body
64+
* holds the full request body.
65+
*/
66+
char *body;
67+
} RestCatalogRequest;
68+
3769
extern PGDLLEXPORT void RegisterNamespaceToRestCatalog(const char *catalogName, const char *namespaceName);
70+
extern PGDLLEXPORT void StartStageRestCatalogIcebergTableCreate(Oid relationId);
71+
extern PGDLLEXPORT char *FinishStageRestCatalogIcebergTableCreateRestRequest(Oid relationId, DataFileSchema * dataFileSchema, List *partitionSpecs);
3872
extern PGDLLEXPORT void ErrorIfRestNamespaceDoesNotExist(const char *catalogName, const char *namespaceName);
3973
extern PGDLLEXPORT char *GetRestCatalogName(Oid relationId);
4074
extern PGDLLEXPORT char *GetRestCatalogNamespace(Oid relationId);
@@ -43,3 +77,12 @@ extern PGDLLEXPORT bool IsReadOnlyRestCatalogIcebergTable(Oid relationId);
4377
extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(const char *restCatalogName, const char *namespaceName,
4478
const char *relationName);
4579
extern PGDLLEXPORT char *GetMetadataLocationForRestCatalogForIcebergTable(Oid relationId);
80+
extern PGDLLEXPORT void ReportHTTPError(HttpResult httpResult, int level);
81+
extern PGDLLEXPORT List *PostHeadersWithAuth(void);
82+
extern PGDLLEXPORT List *DeleteHeadersWithAuth(void);
83+
extern PGDLLEXPORT RestCatalogRequest * GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId);
84+
extern PGDLLEXPORT RestCatalogRequest * GetAddSchemaCatalogRequest(Oid relationId, DataFileSchema * dataFileSchema);
85+
extern PGDLLEXPORT RestCatalogRequest * GetSetCurrentSchemaCatalogRequest(Oid relationId, int32_t schemaId);
86+
extern PGDLLEXPORT RestCatalogRequest * GetAddPartitionCatalogRequest(Oid relationId, List *partitionSpec);
87+
extern PGDLLEXPORT RestCatalogRequest * GetSetPartitionDefaultIdCatalogRequest(Oid relationId, int specId);
88+
extern PGDLLEXPORT RestCatalogRequest * GetRemoveSnapshotCatalogRequest(List *removedSnapshotIds, Oid relationId);

pg_lake_iceberg/src/iceberg/api/table_metadata.c

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,13 +215,13 @@ AddIcebergSnapshotToMetadata(IcebergTableMetadata * metadata, IcebergSnapshot *
215215
* It only does the in-memory operation, the caller is responsible for
216216
* persisting the changes.
217217
*/
218-
bool
218+
List *
219219
RemoveOldSnapshotsFromMetadata(Oid relationId, IcebergTableMetadata * metadata, bool isVerbose)
220220
{
221221
if (metadata->snapshots_length == 0)
222222
{
223223
/* no snapshots yet, not possible to trigger, but let's be defensive */
224-
return false;
224+
return NIL;
225225
}
226226

227227
/*
@@ -242,7 +242,7 @@ RemoveOldSnapshotsFromMetadata(Oid relationId, IcebergTableMetadata * metadata,
242242

243243
if (expiredSnapshotCount == 0)
244244
/* no snapshots to expire */
245-
return false;
245+
return NIL;
246246

247247
/* we might expire all snapshots, always retain at least 1 snapshot */
248248
if (nonExpiredSnapshotCount == 0)
@@ -261,8 +261,15 @@ RemoveOldSnapshotsFromMetadata(Oid relationId, IcebergTableMetadata * metadata,
261261
nonExpiredSnapshotCount = 1;
262262
}
263263

264+
List *expiredSnapshotIds = NIL;
265+
264266
for (int snapshotIndex = 0; snapshotIndex < expiredSnapshotCount; snapshotIndex++)
265267
{
268+
int64_t *expiredSnapshotIdPtr = palloc(sizeof(int64_t));
269+
270+
*expiredSnapshotIdPtr = expiredSnapshots[snapshotIndex].snapshot_id;
271+
expiredSnapshotIds = lappend(expiredSnapshotIds, expiredSnapshotIdPtr);
272+
266273
ereport(isVerbose ? INFO : DEBUG1,
267274
(errmsg("expiring snapshot %" PRId64 " from %s",
268275
expiredSnapshots[snapshotIndex].snapshot_id,
@@ -271,10 +278,16 @@ RemoveOldSnapshotsFromMetadata(Oid relationId, IcebergTableMetadata * metadata,
271278

272279
DeleteUnreferencedFiles(relationId, metadata, expiredSnapshots, expiredSnapshotCount, nonExpiredSnapshots, nonExpiredSnapshotCount);
273280

274-
metadata->snapshots = nonExpiredSnapshots;
275-
metadata->snapshots_length = nonExpiredSnapshotCount;
281+
IcebergCatalogType catalogType = GetIcebergCatalogType(relationId);
282+
bool writableRestCatalogTable = catalogType == REST_CATALOG_READ_WRITE;
283+
284+
if (!writableRestCatalogTable)
285+
{
286+
metadata->snapshots = nonExpiredSnapshots;
287+
metadata->snapshots_length = nonExpiredSnapshotCount;
288+
}
276289

277-
return true;
290+
return expiredSnapshotIds;
278291
}
279292

280293

pg_lake_iceberg/src/iceberg/catalog.c

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
char *IcebergDefaultLocationPrefix = NULL;
3535

36-
static char *GetIcebergCatalogMetadataLocation(Oid relationId, bool forUpdate);
3736
static char *GetIcebergExternalMetadataLocation(Oid relationId);
3837
static char *GetIcebergCatalogMetadataLocationInternal(Oid relationId, bool isPrevMetadata, bool forUpdate);
3938
static char *GetIcebergCatalogColumnInternal(Oid relationId, char *columnName, bool forUpdate, bool errorIfNotFound);
@@ -66,7 +65,7 @@ InsertInternalIcebergCatalogTable(Oid relationId, const char *metadataLocation,
6665

6766
DECLARE_SPI_ARGS(3);
6867
SPI_ARG_VALUE(1, OIDOID, relationId, false);
69-
SPI_ARG_VALUE(2, TEXTOID, metadataLocation, false);
68+
SPI_ARG_VALUE(2, TEXTOID, metadataLocation, metadataLocation == NULL);
7069
SPI_ARG_VALUE(3, BOOLOID, hasCustomLocation, false);
7170

7271
SPI_START();
@@ -358,19 +357,41 @@ GetIcebergMetadataLocation(Oid relationId, bool forUpdate)
358357
{
359358
Assert(IsIcebergTable(relationId));
360359

361-
if (IsInternalIcebergTable(relationId))
362-
{
363-
return GetIcebergCatalogMetadataLocation(relationId, forUpdate);
364-
}
365-
else
360+
IcebergCatalogType catalogType = GetIcebergCatalogType(relationId);
361+
362+
if (IsExternalIcebergTable(relationId) || catalogType == REST_CATALOG_READ_WRITE)
366363
{
367-
if (forUpdate)
364+
/*
365+
* We get the metadata location from the iceberg catalog table for
366+
* internal iceberg tables, except for REST_CATALOG_READ_WRITE.
367+
*
368+
* Writable rest catalog iceberg tables are a bit different. They are
369+
* internal iceberg tables, all their metadata is stored in our
370+
* catalogs, such as data files, column stats, etc. However, their
371+
* metadata location is stored in the rest catalog itself. So, we need
372+
* to fetch the metadata location from the rest catalog for those
373+
* tables, see below.
374+
*/
375+
if (forUpdate && catalogType != REST_CATALOG_READ_WRITE)
368376
ereport(ERROR,
369377
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
370378
errmsg("Updating iceberg metadata is not supported for external Iceberg tables")));
379+
else if (forUpdate && catalogType == REST_CATALOG_READ_WRITE)
380+
{
381+
/*
382+
* But we still need to acquire locks on the iceberg catalog table
383+
* row for internal iceberg tables to prevent concurrent updates.
384+
* So, we call GetIcebergCatalogMetadataLocation().
385+
*/
386+
LockIcebergPgLakeCatalogForUpdate(relationId);
387+
}
371388

372389
return GetIcebergExternalMetadataLocation(relationId);
373390
}
391+
else
392+
{
393+
return GetIcebergCatalogMetadataLocation(relationId, forUpdate);
394+
}
374395
}
375396

376397

@@ -381,7 +402,7 @@ GetIcebergMetadataLocation(Oid relationId, bool forUpdate)
381402
* If the metadata row for the table is going to be updated, the caller should
382403
* pass forUpdate as true.
383404
*/
384-
static char *
405+
char *
385406
GetIcebergCatalogMetadataLocation(Oid relationId, bool forUpdate)
386407
{
387408
Assert(IsInternalIcebergTable(relationId));
@@ -390,6 +411,32 @@ GetIcebergCatalogMetadataLocation(Oid relationId, bool forUpdate)
390411
}
391412

392413

414+
/*
415+
* LockIcebergPgLakeCatalogForUpdate acquires necessary locks on the
416+
* iceberg catalog table row for the given relation to serialize concurrent
417+
* updates. Note that normally GetIcebergCatalogMetadataLocation() already does
418+
* the trick for us. This function is only needed in cases where we need to
419+
* acquire the locks but don't have the metadata location itself, such as
420+
* writable rest catalog iceberg tables.
421+
*/
422+
void
423+
LockIcebergPgLakeCatalogForUpdate(Oid relationId)
424+
{
425+
/*
426+
* We don't actually need the metadata location here. We just need to
427+
* acquire the necessary locks on the iceberg catalog table row to
428+
* serialize concurrent updates.
429+
*
430+
* Note that we cannot skip reading the actual metadata location and
431+
* returning it here as other callers might depend on the returned value.
432+
* So, we call GetIcebergCatalogMetadataLocation() as usual.
433+
*/
434+
bool forUpdate = true;
435+
436+
GetIcebergCatalogMetadataLocationInternal(relationId, false, forUpdate);
437+
}
438+
439+
393440
/*
394441
* GetIcebergExternalMetadataLocation returns the metadata location for an external iceberg table.
395442
*/
@@ -402,7 +449,7 @@ GetIcebergExternalMetadataLocation(Oid relationId)
402449

403450
char *currentMetadataPath = NULL;
404451

405-
if (icebergCatalogType == REST_CATALOG_READ_ONLY)
452+
if (icebergCatalogType == REST_CATALOG_READ_ONLY || icebergCatalogType == REST_CATALOG_READ_WRITE)
406453
{
407454
currentMetadataPath = GetMetadataLocationForRestCatalogForIcebergTable(relationId);
408455
}
@@ -549,13 +596,13 @@ bool
549596
RelationExistsInTheIcebergCatalog(Oid relationId)
550597
{
551598
bool forUpdate = false;
552-
char *columnName = "metadata_location";
599+
char *columnName = "table_name";
553600
bool errorIfNotFound = false;
554601

555-
char *metadataLocation =
602+
char *tableName =
556603
GetIcebergCatalogColumnInternal(relationId, columnName, forUpdate, errorIfNotFound);
557604

558-
return metadataLocation != NULL;
605+
return tableName != NULL;
559606
}
560607

561608

0 commit comments

Comments
 (0)