Skip to content

Commit 7cc8b6b

Browse files
WIP, drop table
1 parent 1084ab9 commit 7cc8b6b

File tree

3 files changed

+70
-15
lines changed

3 files changed

+70
-15
lines changed

pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ typedef enum RestCatalogOperationType
4545
REST_CATALOG_ADD_SNAPSHOT = 1,
4646
REST_CATALOG_ADD_SCHEMA = 2,
4747
REST_CATALOG_ADD_PARTITION = 3,
48-
REST_CATALOG_REMOVE_SNAPSHOT = 4
48+
REST_CATALOG_REMOVE_SNAPSHOT = 4,
49+
REST_CATALOG_DROP_TABLE = 5
4950
} RestCatalogOperationType;
5051

5152

@@ -74,6 +75,7 @@ extern PGDLLEXPORT char *GetMetadataLocationFromRestCatalog(const char *restCata
7475
extern PGDLLEXPORT char *GetMetadataLocationForRestCatalogForIcebergTable(Oid relationId);
7576
extern PGDLLEXPORT void ReportHTTPError(HttpResult httpResult, int level);
7677
extern PGDLLEXPORT List *PostHeadersWithAuth(void);
78+
extern PGDLLEXPORT List *DeleteHeadersWithAuth(void);
7779
extern PGDLLEXPORT RestCatalogRequest * GetAddSnapshotCatalogRequest(IcebergSnapshot * newSnapshot, Oid relationId);
7880
extern PGDLLEXPORT RestCatalogRequest * GetAddSchemaCatalogRequest(Oid relationId, DataFileSchema * dataFileSchema);
7981
extern PGDLLEXPORT RestCatalogRequest * GetAddPartitionCatalogRequest(Oid relationId, List *partitionSpec);

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,19 @@ PostHeadersWithAuth(void)
440440
pstrdup("Content-Type: application/json"));
441441
}
442442

443+
444+
445+
/*
446+
* Creates the headers for a DELETE request with authentication.
447+
*/
448+
List *
449+
DeleteHeadersWithAuth(void)
450+
{
451+
return list_make1(psprintf("Authorization: Bearer %s", RestCatalogFetchAccessToken()));
452+
}
453+
454+
455+
443456
/*
444457
* Creates the headers for a GET request with authentication.
445458
*/

pg_lake_table/src/transaction/track_iceberg_metadata_changes.c

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ typedef struct RestCatalogRequestPerTable
5353
char *catalogNamespace;
5454
char *catalogTableName;
5555

56-
List *createTableRequests;
56+
RestCatalogRequest *createTableRequest;
57+
RestCatalogRequest *dropTableRequest;
58+
5759
List *tableModifyRequests;
5860
} RestCatalogRequestPerTable;
5961

@@ -205,13 +207,17 @@ PostAllRestCatalogRequests(void)
205207

206208
while ((requestPerTable = hash_seq_search(&status)) != NULL)
207209
{
208-
ListCell *requestCell = NULL;
210+
RestCatalogRequest *createTableRequest = requestPerTable->createTableRequest;
211+
RestCatalogRequest *dropTableRequest = requestPerTable->dropTableRequest;
209212

210-
foreach(requestCell, requestPerTable->createTableRequests)
213+
if (createTableRequest != NULL || dropTableRequest != NULL)
211214
{
212-
RestCatalogRequest *request = (RestCatalogRequest *) lfirst(requestCell);
213-
214-
Assert(request->operationType == REST_CATALOG_CREATE_TABLE);
215+
/*
216+
* We can only have one create or one drop table request per
217+
* table. If a table is created and dropped in the same
218+
* transaction, we skip both requests.
219+
*/
220+
Assert(createTableRequest == NULL || dropTableRequest == NULL);
215221

216222
const char *url =
217223
psprintf(REST_CATALOG_TABLE,
@@ -220,16 +226,37 @@ PostAllRestCatalogRequests(void)
220226
URLEncodePath(requestPerTable->catalogNamespace),
221227
URLEncodePath(requestPerTable->catalogTableName));
222228

223-
HttpResult httpResult = HttpPost(url, request->body, PostHeadersWithAuth());
229+
if (createTableRequest != NULL)
230+
{
231+
HttpResult httpResult = HttpPost(url, createTableRequest->body, PostHeadersWithAuth());
232+
233+
if (httpResult.status != 200)
234+
{
235+
ReportHTTPError(httpResult, WARNING);
224236

225-
if (httpResult.status != 200)
237+
/*
238+
* Ouch, something failed. Should we stop sending the
239+
* requests?
240+
*/
241+
}
242+
}
243+
else if (dropTableRequest != NULL)
226244
{
227-
ReportHTTPError(httpResult, WARNING);
245+
HttpResult httpResult = HttpDelete(url, DeleteHeadersWithAuth());
246+
247+
if (httpResult.status != 204)
248+
{
249+
ReportHTTPError(httpResult, WARNING);
228250

229-
/*
230-
* Ouch, something failed. Should we stop sending the
231-
* requests?
232-
*/
251+
/*
252+
* Ouch, something failed. Should we stop sending the
253+
* requests?
254+
*/
255+
}
256+
}
257+
else
258+
{
259+
pg_unreachable();
233260
}
234261
}
235262
}
@@ -481,7 +508,11 @@ RecordRestCatalogRequestInTx(Oid relationId, RestCatalogOperationType operationT
481508
if (operationType == REST_CATALOG_CREATE_TABLE)
482509
{
483510
request->body = pstrdup(body);
484-
requestPerTable->createTableRequests = list_make1(request);
511+
requestPerTable->createTableRequest = request;
512+
}
513+
else if (operationType == REST_CATALOG_DROP_TABLE)
514+
{
515+
requestPerTable->dropTableRequest = request;
485516
}
486517
else if (operationType == REST_CATALOG_ADD_SNAPSHOT ||
487518
operationType == REST_CATALOG_ADD_SCHEMA ||
@@ -536,7 +567,16 @@ ApplyTrackedIcebergMetadataChanges(void)
536567

537568
/* relation is dropped */
538569
if (!RelationExistsInTheIcebergCatalog(relationId))
570+
{
571+
/*
572+
* if created and dropped in the same tx, treat as no-op and skip
573+
* all
574+
*/
575+
if (!opTracker->relationCreated)
576+
RecordRestCatalogRequestInTx(relationId, REST_CATALOG_DROP_TABLE, NULL);
577+
539578
continue;
579+
}
540580

541581
List *allTransforms = AllPartitionTransformList(relationId);
542582

0 commit comments

Comments
 (0)