Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@

#include "nodes/pg_list.h"

extern PGDLLEXPORT void ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allTransforms, bool isVerbose);
extern PGDLLEXPORT List *ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allTransforms, bool isVerbose);
extern PGDLLEXPORT bool ShouldSkipMetadataChangeToIceberg(List *metadataOperationTypes);
extern PGDLLEXPORT List *GetMetadataOperationTypes(List *metadataOperations);
21 changes: 21 additions & 0 deletions pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "pg_lake/http/http_client.h"
#include "pg_lake/util/rel_utils.h"
#include "pg_lake/parquet/field.h"
#include "pg_lake/iceberg/api/snapshot.h"

extern PGDLLEXPORT char *RestCatalogHost;
extern char *RestCatalogClientId;
Expand All @@ -36,6 +37,25 @@ extern char *RestCatalogClientSecret;

#define REST_CATALOG_AUTH_TOKEN_PATH "%s/api/catalog/v1/oauth/tokens"

#define REST_CATALOG_TRANSACTION_COMMIT "%s/api/catalog/v1/%s/transactions/commit"

typedef enum RestCatalogOperationType
{
REST_CATALOG_CREATE_TABLE = 0,
REST_CATALOG_ADD_SNAPSHOT = 1,
REST_CATALOG_ADD_SCHEMA = 2,
REST_CATALOG_ADD_PARTITION = 3,
} RestCatalogOperationType;


typedef struct RestCatalogRequest
{
Oid relationId;
RestCatalogOperationType operationType;

char *body;
} RestCatalogRequest;

extern PGDLLEXPORT char *RestCatalogFetchAccessToken(void);
extern PGDLLEXPORT void StartStageRestCatalogIcebergTableCreate(Oid relationId);
extern PGDLLEXPORT char *FinishStageRestCatalogIcebergTableCreateRestRequest(Oid relationId, DataFileSchema * dataFileSchema, List *partitionSpecs);
Expand All @@ -54,3 +74,4 @@ extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(const char *restCata
extern PGDLLEXPORT char *GetMetadataLocationForRestCatalogForIcebergTable(Oid relationId);
extern PGDLLEXPORT void ReportHTTPError(HttpResult httpResult, int level);
extern PGDLLEXPORT List *PostHeadersWithAuth(void);
extern PGDLLEXPORT RestCatalogRequest * GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId);
38 changes: 22 additions & 16 deletions pg_lake_iceberg/src/iceberg/metadata_operations.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,11 @@ static void DeleteInProgressManifests(Oid relationId, List *manifests);
* ApplyIcebergMetadataChanges applies the given metadata operations to the
* iceberg metadata for the given relation.
*/
void
List *
ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allTransforms, bool isVerbose)
{
List *restCatalogRequests = NIL;

Assert(metadataOperations != NIL);

#ifdef USE_ASSERT_CHECKING
Expand Down Expand Up @@ -244,10 +246,13 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT

createNewSnapshot = true;

/*
* TODO: Create RestCatalogRequest for setting the current_snapshot_id
* in the writable rest catalog iceberg table.
*/
if (writableRestCatalogTable)
{
RestCatalogRequest *request =
GetAddSnapshotCatalogRequest(newSnapshot, relationId);

restCatalogRequests = lappend(restCatalogRequests, request);
}
}

/* if we need to expire old snapshots, we do it here */
Expand All @@ -269,24 +274,18 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT

/* if there were no changes to the Iceberg table, we are done */
if (!createNewSnapshot && !createNewTable)
return;
{
Assert(restCatalogRequests == NIL);
return restCatalogRequests;
}

if (writableRestCatalogTable)
{

if (createNewSnapshot)
{
/*
* TODO: Create RestCatalogRequest for adding the new snapshot in
* the writable rest catalog iceberg table.
*/
}

/*
* We are done, writable rest catalog iceberg tables have their
* metadata updated in the catalog itself.
*/
return;
return restCatalogRequests;
}

/* add the new snapshot to the snapshot log */
Expand Down Expand Up @@ -327,6 +326,13 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT
}

TriggerCatalogExportIfObjectStoreTable(relationId);

/*
* for a non-writable rest table, we should not have any rest catalog
* requests
*/
Assert(restCatalogRequests == NIL);
return restCatalogRequests;
}


Expand Down
41 changes: 41 additions & 0 deletions pg_lake_iceberg/src/rest_catalog/rest_catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "pg_lake/parsetree/options.h"
#include "pg_lake/util/url_encode.h"
#include "pg_lake/util/rel_utils.h"
#include "pg_lake/iceberg/temporal_utils.h"


/* determined by GUC */
Expand Down Expand Up @@ -810,3 +811,43 @@ AppendIcebergPartitionSpecForRestCatalogStage(List *partitionSpecs)
}
return command->data;
}



/*
* GetAddSnapshotCatalogRequest creates a RestCatalogRequest to add a snapshot
* to the rest catalog for the given new snapshot.
*/
RestCatalogRequest *
GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId)
{
StringInfo body = makeStringInfo();

appendStringInfoString(body,
"{\"action\":\"add-snapshot\",\"snapshot\":{");

appendStringInfo(body, "\"snapshot-id\":%lld", newSnapshot->snapshot_id);
if (newSnapshot->parent_snapshot_id > 0)
appendStringInfo(body, ",\"parent-snapshot-id\":%lld", newSnapshot->parent_snapshot_id);

/* TODO: Polaris doesn't accept 0 sequence number */
appendStringInfo(body, ",\"sequence-number\":%lld", newSnapshot->sequence_number == 0 ? 1 : newSnapshot->sequence_number);
appendStringInfo(body, ",\"timestamp-ms\":%ld", (long) (PostgresTimestampToIcebergTimestampMs())); /* coarse ms */
appendStringInfo(body, ",\"manifest-list\":\"%s\"", newSnapshot->manifest_list);
appendStringInfoString(body, ",\"summary\":{\"operation\": \"append\"}");

if (newSnapshot->schema_id > 0)
appendStringInfo(body, ",\"schema-id\":%d", newSnapshot->schema_id);

appendStringInfoString(body, "}}, "); /* end add-snapshot */

appendStringInfo(body, "{\"action\":\"set-snapshot-ref\", \"type\":\"branch\", \"ref-name\":\"main\", \"snapshot-id\":%lld}", newSnapshot->snapshot_id);

RestCatalogRequest *request = palloc0(sizeof(RestCatalogRequest));

request->relationId = relationId;
request->operationType = REST_CATALOG_ADD_SNAPSHOT;
request->body = body->data;

return request;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "postgres.h"
#include "access/hash.h"
#include "pg_lake/rest_catalog/rest_catalog.h"

typedef struct TableMetadataOperationTracker
{
Expand All @@ -32,20 +33,12 @@ typedef struct TableMetadataOperationTracker
bool relationSnapshotExpirationRequested;
} TableMetadataOperationTracker;

typedef enum RestCatalogOperationType
{
REST_CATALOG_CREATE_ICEBERG_TABLE = 0,
REST_CATALOG_ADD_SNAPSHOT = 1,
REST_CATALOG_ADD_SCHEMA = 2,
REST_CATALOG_ADD_PARTITION = 3,
} RestCatalogOperationType;

extern PGDLLEXPORT void ConsumeTrackedIcebergMetadataChanges(void);
extern PGDLLEXPORT void PostAllRestCatalogRequests(void);
extern PGDLLEXPORT void TrackIcebergMetadataChangesInTx(Oid relationId, List *metadataOperationTypes);
extern PGDLLEXPORT void RecordRestCatalogRequestInTx(Oid relationId, RestCatalogOperationType operationType,
const char *catalogName, const char *catalogNamespace,
const char *catalogTableName, const char *body);
const char *body);
extern PGDLLEXPORT void ResetTrackedIcebergMetadataOperation(void);
extern PGDLLEXPORT void ResetRestCatalogRequests(void);
extern PGDLLEXPORT HTAB *GetTrackedIcebergMetadataOperations(void);
Expand Down
Loading