Skip to content

Commit 3cf0d5e

Browse files
rebase on top 70
1 parent 30c0cde commit 3cf0d5e

File tree

5 files changed

+49
-11
lines changed

5 files changed

+49
-11
lines changed

pg_lake_iceberg/include/pg_lake/iceberg/catalog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ extern PGDLLEXPORT void DeleteInternalIcebergCatalogTable(Oid relationId);
5555

5656
extern PGDLLEXPORT List *GetAllInternalIcebergRelationIds(void);
5757
extern PGDLLEXPORT char *GetIcebergMetadataLocation(Oid relationId, bool forUpdate);
58+
extern PGDLLEXPORT char *GetIcebergCatalogMetadataLocation(Oid relationId, bool forUpdate);
5859
extern PGDLLEXPORT char *GetIcebergCatalogPreviousMetadataLocation(Oid relationId, bool forUpdate);
5960
extern PGDLLEXPORT void UpdateExternalCatalogMetadataLocation(char *catalogName, char *schemaName, char *tableName, const char *metadataLocation,
6061
const char *previousMetadataLocation);

pg_lake_iceberg/src/iceberg/catalog.c

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
char *IcebergDefaultLocationPrefix = NULL;
3535

36-
static char *GetIcebergCatalogMetadataLocation(Oid relationId, bool forUpdate);
3736
static char *GetIcebergExternalMetadataLocation(Oid relationId);
3837
static char *GetIcebergCatalogMetadataLocationInternal(Oid relationId, bool isPrevMetadata, bool forUpdate);
3938
static char *GetIcebergCatalogColumnInternal(Oid relationId, char *columnName, bool forUpdate, bool errorIfNotFound);
@@ -360,7 +359,37 @@ GetIcebergMetadataLocation(Oid relationId, bool forUpdate)
360359

361360
if (IsInternalIcebergTable(relationId))
362361
{
363-
return GetIcebergCatalogMetadataLocation(relationId, forUpdate);
362+
IcebergCatalogType catalogType = GetIcebergCatalogType(relationId);
363+
364+
/*
365+
* We always get the metadata location from the iceberg catalog table
366+
* for internal iceberg tables.
367+
*
368+
* Writable rest catalog iceberg tables are a bit different. They are
369+
* internal iceberg tables, all their metadata is stored in our
370+
* catalogs, such as data files, column stats, etc. However, their
371+
* metadata location is stored in the rest catalog itself. So, we need
372+
* to fetch the metadata location from the rest catalog for those
373+
* tables, see below.
374+
*
375+
* But we still need to acquire locks on the iceberg catalog table row
376+
* for internal iceberg tables to prevent concurrent updates. So, we
377+
* call GetIcebergCatalogMetadataLocation().
378+
*/
379+
char *metadataLocation = GetIcebergCatalogMetadataLocation(relationId, forUpdate);
380+
381+
if (catalogType == REST_CATALOG_READ_WRITE)
382+
{
383+
/*
384+
* our internal catalog never stores metadata location for
385+
* writable rest catalog tables
386+
*/
387+
Assert(metadataLocation == NULL);
388+
389+
metadataLocation = GetIcebergExternalMetadataLocation(relationId);
390+
}
391+
392+
return metadataLocation;
364393
}
365394
else
366395
{
@@ -381,7 +410,7 @@ GetIcebergMetadataLocation(Oid relationId, bool forUpdate)
381410
* If the metadata row for the table is going to be updated, the caller should
382411
* pass forUpdate as true.
383412
*/
384-
static char *
413+
char *
385414
GetIcebergCatalogMetadataLocation(Oid relationId, bool forUpdate)
386415
{
387416
Assert(IsInternalIcebergTable(relationId));
@@ -402,7 +431,7 @@ GetIcebergExternalMetadataLocation(Oid relationId)
402431

403432
char *currentMetadataPath = NULL;
404433

405-
if (icebergCatalogType == REST_CATALOG_READ_ONLY)
434+
if (icebergCatalogType == REST_CATALOG_READ_ONLY || icebergCatalogType == REST_CATALOG_READ_WRITE)
406435
{
407436
currentMetadataPath = GetMetadataLocationForRestCatalogForIcebergTable(relationId);
408437
}

pg_lake_iceberg/src/iceberg/metadata_operations.c

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,16 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT
171171
* Although writable rest catalog iceberg tables have their metadata
172172
* location stored in the rest catalog itself, we still need to read the
173173
* pg_lake metadata as forUpdate=true acquires necessary locks to prevent
174-
* concurrent updates.
174+
* concurrent updates. To achieve this, we use
175+
* GetIcebergCatalogMetadataLocation function as that's the common
176+
* practice in the code.
175177
*/
176-
char *metadataPath = GetIcebergMetadataLocation(relationId, forUpdate);
178+
char *metadataPath = GetIcebergCatalogMetadataLocation(relationId, forUpdate);
179+
177180
bool createNewTable = HasCreateTableOperation(metadataOperations);
178-
181+
179182
IcebergTableMetadata *metadata = NULL;
183+
180184
if (createNewTable)
181185
{
182186
metadata = GenerateInitialIcebergTableMetadata(relationId);
@@ -203,6 +207,7 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT
203207
}
204208
else
205209
{
210+
metadataPath = GetIcebergMetadataLocation(relationId, false);
206211
metadata = ReadIcebergTableMetadata(metadataPath);
207212

208213
prevLastUpdatedMs = metadata->last_updated_ms;

pg_lake_table/src/ddl/drop_table.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ MarkAllReferencedFilesForDeletion(Oid relationId)
463463
TimestampTz orphanedAt = GetCurrentTransactionStartTimestamp();
464464
char *metadataLocation = GetIcebergMetadataLocation(relationId, true);
465465
MemoryContext savedContext = CurrentMemoryContext;
466-
466+
467467
List *allFiles = NIL;
468468
volatile bool success = true;
469469

pg_lake_table/src/transaction/track_iceberg_metadata_changes.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,15 @@ TrackIcebergMetadataChangesInTx(Oid relationId, List *metadataOperationTypes)
101101
return;
102102

103103
/*
104-
* we might also defer acquiring locks to precommit hook but let's keep
105-
* them here to prevent any subtle bug
104+
* We might also defer acquiring locks to precommit hook but let's keep
105+
* them here to prevent any subtle bug. We call
106+
* GetIcebergCatalogMetadataLocation() to acquire the necessary locks, not
107+
* for the actual metadata location as our serialization of iceberg
108+
* metadata changes relies on those locks.
106109
*/
107110
bool forUpdate = true;
108111

109-
GetIcebergMetadataLocation(relationId, forUpdate);
112+
GetIcebergCatalogMetadataLocation(relationId, forUpdate);
110113

111114
ListCell *operationCell = NULL;
112115

0 commit comments

Comments
 (0)