Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pg_lake_iceberg/include/pg_lake/iceberg/metadata_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,5 @@ typedef struct IcebergTableMetadata

extern PGDLLEXPORT IcebergTableMetadata * ReadIcebergTableMetadata(const char *tableMetadataPath);
extern PGDLLEXPORT char *WriteIcebergTableMetadataToJson(IcebergTableMetadata * metadata);
extern PGDLLEXPORT void AppendIcebergTableSchemaForRestCatalogStage(StringInfo command, IcebergTableSchema * schemas, size_t schemas_length);
extern PGDLLEXPORT void AppendIcebergPartitionSpecFields(StringInfo command, IcebergPartitionSpecField * fields, size_t fields_length);
12 changes: 10 additions & 2 deletions pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
#include "postgres.h"
#include "pg_lake/http/http_client.h"
#include "pg_lake/util/rel_utils.h"
#include "pg_lake/parquet/field.h"

extern char *RestCatalogHost;
extern PGDLLEXPORT char *RestCatalogHost;
extern char *RestCatalogClientId;
extern char *RestCatalogClientSecret;

Expand All @@ -30,14 +31,19 @@ extern char *RestCatalogClientSecret;
#define REST_CATALOG_NAMESPACE_NAME "%s/api/catalog/v1/%s/namespaces/%s"
#define REST_CATALOG_NAMESPACE "%s/api/catalog/v1/%s/namespaces"

#define REST_CATALOG_TABLE "%s/api/catalog/v1/%s/namespaces/%s/tables/%s"
#define REST_CATALOG_TABLES "%s/api/catalog/v1/%s/namespaces/%s/tables"

#define REST_CATALOG_AUTH_TOKEN_PATH "%s/api/catalog/v1/oauth/tokens"
#define GET_REST_CATALOG_METADATA_LOCATION "%s/api/catalog/v1/%s/namespaces/%s/tables/%s"

extern PGDLLEXPORT char *RestCatalogFetchAccessToken(void);
extern PGDLLEXPORT void StartStageRestCatalogIcebergTableCreate(Oid relationId);
extern PGDLLEXPORT char *FinishStageRestCatalogIcebergTableCreateRestRequest(Oid relationId, DataFileSchema * dataFileSchema, List *partitionSpecs);
extern PGDLLEXPORT void RegisterNamespaceToRestCatalog(const char *catalogName, const char *namespaceName,
bool hasRestCatalogReadOnlyOption);
extern PGDLLEXPORT void ErrorIfRestNamespaceDoesNotExist(const char *catalogName, const char *namespaceName);
extern PGDLLEXPORT IcebergCatalogType GetIcebergCatalogType(Oid relationId);
extern PGDLLEXPORT char *GetRestCatalogName(Oid relationId);
extern PGDLLEXPORT char *GetRestCatalogNamespace(Oid relationId);
extern PGDLLEXPORT char *GetRestCatalogTableName(Oid relationId);
extern PGDLLEXPORT bool HasRestCatalogTableOption(List *options);
Expand All @@ -46,3 +52,5 @@ extern PGDLLEXPORT bool IsReadOnlyRestCatalogIcebergTable(Oid relationId);
extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(const char *restCatalogName, const char *namespaceName,
const char *relationName);
extern PGDLLEXPORT char *GetMetadataLocationForRestCatalogForIcebergTable(Oid relationId);
extern PGDLLEXPORT void ReportHTTPError(HttpResult httpResult, int level);
extern PGDLLEXPORT List *PostHeadersWithAuth(void);
4 changes: 2 additions & 2 deletions pg_lake_iceberg/src/iceberg/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ InsertInternalIcebergCatalogTable(Oid relationId, const char *metadataLocation,

DECLARE_SPI_ARGS(3);
SPI_ARG_VALUE(1, OIDOID, relationId, false);
SPI_ARG_VALUE(2, TEXTOID, metadataLocation, false);
SPI_ARG_VALUE(2, TEXTOID, metadataLocation, metadataLocation == NULL);
SPI_ARG_VALUE(3, BOOLOID, hasCustomLocation, false);

SPI_START();
Expand Down Expand Up @@ -475,7 +475,7 @@ bool
RelationExistsInTheIcebergCatalog(Oid relationId)
{
bool forUpdate = false;
char *columnName = "metadata_location";
char *columnName = "table_name";
bool errorIfNotFound = false;

char *metadataLocation =
Expand Down
65 changes: 61 additions & 4 deletions pg_lake_iceberg/src/iceberg/metadata_operations.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,33 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT

Assert(!ShouldSkipMetadataChangeToIceberg(metadataOperationTypes));
#endif
IcebergCatalogType catalogType = GetIcebergCatalogType(relationId);
bool writableRestCatalogTable = catalogType == REST_CATALOG_READ_WRITE;

/* read the iceberg metadata for the table */
bool forUpdate = true;
char *metadataPath = GetIcebergCatalogMetadataLocation(relationId, forUpdate);

bool createNewTable = HasCreateTableOperation(metadataOperations);

IcebergTableMetadata *metadata = (createNewTable) ?
GenerateInitialIcebergTableMetadata(relationId) :
ReadIcebergTableMetadata(metadataPath);
IcebergTableMetadata *metadata = NULL;

if (writableRestCatalogTable && !createNewTable)
{
/*
* Writable rest catalog iceberg tables have their metadata updated in
* the catalog itself. We fetch the metadata from the rest catalog. If
* new table, we generate initial metadata below.
*/
metadataPath = GetMetadataLocationForRestCatalogForIcebergTable(relationId);
metadata = ReadIcebergTableMetadata(metadataPath);
}
else
{
metadata = (createNewTable) ?
GenerateInitialIcebergTableMetadata(relationId) :
ReadIcebergTableMetadata(metadataPath);
}


int64_t prevLastUpdatedMs = metadata->last_updated_ms;

Expand All @@ -184,6 +201,11 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT
if (builder->createTable || builder->regenerateSchema)
{
AppendCurrentPostgresSchema(relationId, metadata, builder->schema);

/*
* TODO: Create RestCatalogRequest for updating the schema in the
* writable rest catalog iceberg table.
*/
}

if (builder->createTable || builder->regeneratePartitionSpec)
Expand All @@ -197,6 +219,11 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT
IcebergPartitionSpec *newSpec = lfirst(newSpecCell);

AppendPartitionSpec(metadata, newSpec);

/*
* TODO: Create RestCatalogRequest for updating the partitioning
* in the writable rest catalog iceberg table.
*/
}
}

Expand All @@ -216,6 +243,11 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT
UpdateLatestSnapshot(metadata, newSnapshot);

createNewSnapshot = true;

/*
* TODO: Create RestCatalogRequest for setting the current_snapshot_id
* in the writable rest catalog iceberg table.
*/
}

/* if we need to expire old snapshots, we do it here */
Expand All @@ -225,13 +257,38 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT
RemoveOldSnapshotsFromMetadata(relationId, metadata, isVerbose);

if (expiredSnapshots)
{
createNewSnapshot = true;

/*
* TODO: Create RestCatalogRequest for removing old snapshots in
* the writable rest catalog iceberg table.
*/
}
}

/* if there were no changes to the Iceberg table, we are done */
if (!createNewSnapshot && !createNewTable)
return;

if (writableRestCatalogTable)
{

if (createNewSnapshot)
{
/*
* TODO: Create RestCatalogRequest for adding the new snapshot in
* the writable rest catalog iceberg table.
*/
}

/*
* We are done, writable rest catalog iceberg tables have their
* metadata updated in the catalog itself.
*/
return;
}

/* add the new snapshot to the snapshot log */
GenerateSnapshotLogEntries(metadata);

Expand Down
44 changes: 42 additions & 2 deletions pg_lake_iceberg/src/iceberg/write_table_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ static void AppendProperties(StringInfo command, Property * properties, size_t p
static void AppendField(StringInfo command, Field * field);
static void AppendIcebergStructFields(StringInfo command, FieldStructElement * fields, size_t fields_length);
static void AppendIcebergTableSchemas(StringInfo command, IcebergTableSchema * schemas, size_t schemas_length);
static void AppendIcebergPartitionSpecFields(StringInfo command, IcebergPartitionSpecField * fields, size_t fields_length);
static void AppendIcebergPartitionSpecs(StringInfo command, IcebergPartitionSpec * specs, size_t specs_length);
static void AppendIcebergSnapshots(StringInfo command, IcebergSnapshot * snapshots, size_t snapshots_length);
static void AppendIcebergSnapshotLogEntries(StringInfo command, IcebergSnapshotLogEntry * entries, size_t entries_length);
Expand Down Expand Up @@ -244,6 +243,47 @@ AppendIcebergTableSchemas(StringInfo command, IcebergTableSchema * schemas, size
appendStringInfoString(command, "]");
}


/*
* Similar to AppendIcebergTableSchemas, but specifically for Rest Catalog stage
* API calls.
*/
void
AppendIcebergTableSchemaForRestCatalogStage(StringInfo command, IcebergTableSchema * schemas, size_t schemas_length)
{
appendStringInfoString(command, "\"schema\":");

for (size_t i = 0; i < schemas_length; i++)
{
appendStringInfoString(command, "{");

/* append type */
appendJsonString(command, "type", schemas[i].type);

if (schemas[i].identifier_field_ids_length > 0)
{
appendStringInfoString(command, ", ");
appendStringInfoString(command, "\"identifier-field-ids\":");
AppendIntArray(command, schemas[i].identifier_field_ids,
schemas[i].identifier_field_ids_length);
}

/* Append fields */
appendStringInfoString(command, ", ");

appendStringInfoString(command, "\"fields\":");
AppendIcebergStructFields(command, schemas[i].fields, schemas[i].fields_length);

appendStringInfoString(command, "}");

if (i < schemas_length - 1)
{
appendStringInfoString(command, ", ");
}
}
}


static void
AppendIcebergPartitionSpecs(StringInfo command, IcebergPartitionSpec * specs, size_t specs_length)
{
Expand Down Expand Up @@ -681,7 +721,7 @@ AppendIcebergStructFields(StringInfo command, FieldStructElement * fields, size_
}


static void
void
AppendIcebergPartitionSpecFields(StringInfo command, IcebergPartitionSpecField * fields, size_t fields_length)
{
appendStringInfoString(command, "[");
Expand Down
Loading