Skip to content

Commit 126579e

Browse files
Utils to differentiate internal and external tables
Signed-off-by: Aykut Bozkurt <[email protected]>
1 parent cb222d4 commit 126579e

File tree

11 files changed

+158
-110
lines changed

11 files changed

+158
-110
lines changed

pg_lake_engine/include/pg_lake/pgduck/remote_storage.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ typedef struct RemoteFileDesc
3838

3939
extern PGDLLEXPORT int64 GetRemoteFileSize(char *path);
4040
extern PGDLLEXPORT int64 GetRemoteParquetFileRowCount(char *path);
41-
extern PGDLLEXPORT List *GetRemoteParquetColumnStats(char *path, List *leafFields);
4241
extern PGDLLEXPORT List *ListRemoteFileDescriptions(char *pattern);
4342
extern PGDLLEXPORT List *ListRemoteFileNames(char *pattern);
4443
extern PGDLLEXPORT bool RemoteFileExists(char *path);

pg_lake_engine/include/pg_lake/util/rel_utils.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ struct PgLakeTableProperties;
6767
#define PG_LAKE_ICEBERG_SERVER_NAME "pg_lake_iceberg"
6868

6969
extern PGDLLEXPORT bool IsAnyLakeForeignTableById(Oid foreignTableId);
70+
extern PGDLLEXPORT char *GetForeignTablePath(Oid foreignTableId);
7071
extern PGDLLEXPORT char *GetQualifiedRelationName(Oid relationId);
7172
extern PGDLLEXPORT const char *PgLakeTableTypeToName(PgLakeTableType tableType);
7273
extern PGDLLEXPORT PgLakeTableType GetPgLakeTableType(Oid foreignTableId);
@@ -80,7 +81,6 @@ extern PGDLLEXPORT bool IsPgLakeIcebergServerName(const char *serverName);
8081
extern PGDLLEXPORT char *GetWritableTableLocation(Oid relationId, char **queryArguments);
8182
extern PGDLLEXPORT void EnsureTableOwner(Oid relationId);
8283
extern PGDLLEXPORT struct PgLakeTableProperties GetPgLakeTableProperties(Oid relationId);
83-
extern PGDLLEXPORT bool IsInternalOrExternalIcebergTable(struct PgLakeTableProperties properties);
8484

8585
/* range var help */
8686
extern PGDLLEXPORT List *MakeNameListFromRangeVar(const RangeVar *rel);

pg_lake_engine/src/utils/rel_utils.c

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,32 @@ GetQualifiedRelationName(Oid relationId)
256256
return quote_qualified_identifier(namespaceName, relationName);
257257
}
258258

259+
260+
/*
261+
* GetForeignTablePath - get the path option for the foreign table.
262+
*/
263+
char *
264+
GetForeignTablePath(Oid foreignTableId)
265+
{
266+
ForeignTable *fTable = GetForeignTable(foreignTableId);
267+
ListCell *cell;
268+
269+
foreach(cell, fTable->options)
270+
{
271+
DefElem *defel = (DefElem *) lfirst(cell);
272+
273+
if (strcmp(defel->defname, "path") == 0)
274+
{
275+
return defGetString(defel);
276+
}
277+
}
278+
279+
ereport(ERROR,
280+
(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
281+
errmsg("path option not found for foreign table %u", foreignTableId)));
282+
}
283+
284+
259285
/*
260286
* PgLakeTableTypeToName - convert the PgLakeTableType to a string.
261287
*/
@@ -360,19 +386,6 @@ GetPgLakeTableProperties(Oid relationId)
360386
}
361387

362388

363-
/*
364-
* IsInternalOrExternalIcebergTable - check if the table is an internal or
365-
* external iceberg table.
366-
*/
367-
bool
368-
IsInternalOrExternalIcebergTable(PgLakeTableProperties properties)
369-
{
370-
PgLakeTableType tableType = properties.tableType;
371-
372-
return tableType == PG_LAKE_ICEBERG_TABLE_TYPE ||
373-
(tableType == PG_LAKE_TABLE_TYPE && properties.format == DATA_FORMAT_ICEBERG);
374-
}
375-
376389
/*
377390
* MakeNameListFromRangeVar makes a namelist from a RangeVar. Its behaviour
378391
* should be the exact opposite of postgres' makeRangeVarFromNameList.

pg_lake_iceberg/include/pg_lake/iceberg/catalog.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#include "nodes/primnodes.h"
2121

22+
#include "pg_lake/util/rel_utils.h"
23+
2224
/*
2325
* Default location prefix for pg_lake_iceberg tables. Used when the location
2426
* option is not specified at "CREATE FOREIGN SERVER pg_lake_iceberg OPTIONS ()".
@@ -53,6 +55,8 @@ extern PGDLLEXPORT void DeleteInternalIcebergCatalogTable(Oid relationId);
5355

5456
extern PGDLLEXPORT List *GetAllInternalIcebergRelationIds(void);
5557
extern PGDLLEXPORT char *GetIcebergCatalogMetadataLocation(Oid relationId, bool forUpdate);
58+
extern PGDLLEXPORT char *GetIcebergExternalMetadataLocation(Oid relationId);
59+
extern PGDLLEXPORT IcebergCatalogType GetIcebergCatalogType(Oid relationId);
5660
extern PGDLLEXPORT char *GetIcebergCatalogPreviousMetadataLocation(Oid relationId, bool forUpdate);
5761
extern PGDLLEXPORT void UpdateExternalCatalogMetadataLocation(char *catalogName, char *schemaName, char *tableName, const char *metadataLocation,
5862
const char *previousMetadataLocation);

pg_lake_iceberg/include/pg_lake/rest_catalog/rest_catalog.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ extern char *RestCatalogClientSecret;
3636
extern PGDLLEXPORT char *RestCatalogFetchAccessToken(void);
3737
extern PGDLLEXPORT void RegisterNamespaceToRestCatalog(const char *catalogName, const char *namespaceName);
3838
extern PGDLLEXPORT void ErrorIfRestNamespaceDoesNotExist(const char *catalogName, const char *namespaceName);
39-
extern PGDLLEXPORT IcebergCatalogType GetIcebergCatalogType(Oid relationId);
4039
extern PGDLLEXPORT char *GetRestCatalogName(Oid relationId);
4140
extern PGDLLEXPORT char *GetRestCatalogNamespace(Oid relationId);
4241
extern PGDLLEXPORT char *GetRestCatalogTableName(Oid relationId);

pg_lake_iceberg/src/iceberg/catalog.c

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
#include "postgres.h"
1919
#include "miscadmin.h"
2020

21-
#include "catalog/namespace.h"
21+
#include "pg_lake/extensions/pg_lake_iceberg.h"
2222
#include "pg_lake/iceberg/catalog.h"
23+
#include "pg_lake/object_store_catalog/object_store_catalog.h"
2324
#include "pg_lake/rest_catalog/rest_catalog.h"
24-
#include "pg_lake/extensions/pg_lake_iceberg.h"
2525
#include "pg_lake/util/rel_utils.h"
2626
#include "pg_lake/util/spi_helpers.h"
27+
#include "catalog/namespace.h"
2728
#include "commands/dbcommands.h"
29+
#include "foreign/foreign.h"
2830
#include "utils/lsyscache.h"
2931
#include "utils/guc.h"
3032

@@ -354,6 +356,70 @@ GetIcebergCatalogMetadataLocation(Oid relationId, bool forUpdate)
354356
return GetIcebergCatalogMetadataLocationInternal(relationId, false, forUpdate);
355357
}
356358

359+
360+
/*
361+
* GetIcebergExternalMetadataLocation returns the metadata location for an external iceberg table.
362+
*/
363+
char *
364+
GetIcebergExternalMetadataLocation(Oid relationId)
365+
{
366+
IcebergCatalogType icebergCatalogType = GetIcebergCatalogType(relationId);
367+
368+
char *currentMetadataPath = NULL;
369+
370+
if (icebergCatalogType == REST_CATALOG_READ_ONLY)
371+
{
372+
currentMetadataPath = GetMetadataLocationForRestCatalogForIcebergTable(relationId);
373+
}
374+
else if (icebergCatalogType == OBJECT_STORE_READ_ONLY)
375+
{
376+
currentMetadataPath = GetMetadataLocationFromExternalObjectStoreCatalogForTable(relationId);
377+
}
378+
else
379+
{
380+
currentMetadataPath = GetForeignTablePath(relationId);
381+
}
382+
383+
return currentMetadataPath;
384+
}
385+
386+
387+
IcebergCatalogType
388+
GetIcebergCatalogType(Oid relationId)
389+
{
390+
if (!IsPgLakeIcebergForeignTableById(relationId))
391+
return NOT_ICEBERG_TABLE;
392+
393+
ForeignTable *foreignTable = GetForeignTable(relationId);
394+
List *options = foreignTable->options;
395+
396+
bool hasRestCatalogOption = HasRestCatalogTableOption(options);
397+
bool hasObjectStoreCatalogOption = HasObjectStoreCatalogTableOption(options);
398+
bool hasReadOnlyOption = HasReadOnlyOption(options);
399+
400+
if (hasRestCatalogOption && hasReadOnlyOption)
401+
{
402+
return REST_CATALOG_READ_ONLY;
403+
}
404+
else if (hasRestCatalogOption && !hasReadOnlyOption)
405+
{
406+
return REST_CATALOG_READ_WRITE;
407+
}
408+
else if (hasObjectStoreCatalogOption && hasReadOnlyOption)
409+
{
410+
return OBJECT_STORE_READ_ONLY;
411+
}
412+
else if (hasObjectStoreCatalogOption && !hasReadOnlyOption)
413+
{
414+
return OBJECT_STORE_READ_WRITE;
415+
}
416+
else
417+
{
418+
return POSTGRES_CATALOG;
419+
}
420+
}
421+
422+
357423
/*
358424
* GetIcebergCatalogPreviousMetadataLocation returns the previous metadata location for a table
359425
* in the iceberg catalog table. Returns NULL if the record is not found.

pg_lake_iceberg/src/rest_catalog/rest_catalog.c

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -449,42 +449,6 @@ EncodeBasicAuth(const char *clientId, const char *clientSecret)
449449
}
450450

451451

452-
IcebergCatalogType
453-
GetIcebergCatalogType(Oid relationId)
454-
{
455-
if (!IsPgLakeIcebergForeignTableById(relationId))
456-
return NOT_ICEBERG_TABLE;
457-
458-
ForeignTable *foreignTable = GetForeignTable(relationId);
459-
List *options = foreignTable->options;
460-
461-
bool hasRestCatalogOption = HasRestCatalogTableOption(options);
462-
bool hasObjectStoreCatalogOption = HasObjectStoreCatalogTableOption(options);
463-
bool hasReadOnlyOption = HasReadOnlyOption(options);
464-
465-
if (hasRestCatalogOption && hasReadOnlyOption)
466-
{
467-
return REST_CATALOG_READ_ONLY;
468-
}
469-
else if (hasRestCatalogOption && !hasReadOnlyOption)
470-
{
471-
return REST_CATALOG_READ_WRITE;
472-
}
473-
else if (hasObjectStoreCatalogOption && hasReadOnlyOption)
474-
{
475-
return OBJECT_STORE_READ_ONLY;
476-
}
477-
else if (hasObjectStoreCatalogOption && !hasReadOnlyOption)
478-
{
479-
return OBJECT_STORE_READ_WRITE;
480-
}
481-
else
482-
{
483-
return POSTGRES_CATALOG;
484-
}
485-
}
486-
487-
488452
/*
489453
* Readable rest catalog tables always use the catalog_table_name option
490454
* as the table name in the external catalog. Writable rest catalog tables

pg_lake_table/include/pg_lake/fdw/utils.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
#include "pg_lake/copy/copy_format.h"
3434

3535
extern PGDLLEXPORT bool IsAnyLakeForeignTable(RangeTblEntry *rte);
36-
extern PGDLLEXPORT char *GetForeignTablePath(Oid foreignTableId);
3736
extern PGDLLEXPORT CopyDataFormat GetForeignTableFormat(Oid foreignTableId);
37+
extern PGDLLEXPORT bool IsAnyIcebergTable(Oid relationId);
38+
extern PGDLLEXPORT bool IsAnyInternalIcebergTable(Oid relationId);
39+
extern PGDLLEXPORT bool IsAnyExternalIcebergTable(Oid relationId);
3840
extern PGDLLEXPORT bool IsWritablePgLakeTable(Oid relationId);
3941
extern PGDLLEXPORT void ErrorIfTypeUnsupportedForIcebergTables(Oid typeOid, int32 typmod, char *columnName);
4042
extern PGDLLEXPORT void ErrorIfTypeUnsupportedNumericForIcebergTables(int32 typmod, char *columnName);

pg_lake_table/src/fdw/data_file_pruning.c

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,12 @@
5050
#include "pg_lake/extensions/postgis.h"
5151
#include "pg_lake/fdw/schema_operations/field_id_mapping_catalog.h"
5252
#include "pg_lake/fdw/schema_operations/register_field_ids.h"
53+
#include "pg_lake/fdw/utils.h"
5354
#include "pg_lake/fdw/data_file_pruning.h"
5455
#include "pg_lake/fdw/partition_transform.h"
5556
#include "pg_lake/fdw/writable_table.h"
5657
#include "pg_lake/iceberg/api/table_schema.h"
58+
#include "pg_lake/iceberg/catalog.h"
5759
#include "pg_lake/iceberg/data_file_stats.h"
5860
#include "pg_lake/iceberg/partitioning/partition.h"
5961
#include "pg_lake/iceberg/temporal_utils.h"
@@ -183,11 +185,9 @@ PruneDataFiles(Oid relationId, List *dataFiles, List *baseRestrictInfoList, Prun
183185
List *columnsUsedInFilters = ColumnsUsedInRestrictions(relationId, baseRestrictInfoList);
184186
List *partitionTransforms = AllPartitionTransformList(relationId);
185187
PgLakeTableProperties tableProperties = GetPgLakeTableProperties(relationId);
186-
PgLakeTableType tableType = tableProperties.tableType;
187-
IcebergCatalogType icebergCatalogType = GetIcebergCatalogType(relationId);
188188

189189
if ((!EnableDataFilePruning && !EnablePartitionPruning) ||
190-
!IsInternalOrExternalIcebergTable(tableProperties))
190+
!IsAnyIcebergTable(relationId))
191191
{
192192
/*
193193
* User disabled or no columns used in filters, so we cannot prune any
@@ -239,20 +239,15 @@ PruneDataFiles(Oid relationId, List *dataFiles, List *baseRestrictInfoList, Prun
239239
List *columnStats = NIL;
240240
Partition *partition = NULL;
241241

242-
if (tableType == PG_LAKE_ICEBERG_TABLE_TYPE &&
243-
(icebergCatalogType == POSTGRES_CATALOG || icebergCatalogType == REST_CATALOG_READ_WRITE ||
244-
icebergCatalogType == OBJECT_STORE_READ_WRITE))
242+
if (IsAnyInternalIcebergTable(relationId))
245243
{
246244
TableDataFile *tableDataFile = (TableDataFile *) list_nth(dataFiles, dataFileIndex);
247245

248246
Assert(tableDataFile->content == CONTENT_DATA);
249247
columnStats = tableDataFile->stats.columnStats;
250248
partition = tableDataFile->partition;
251249
}
252-
else if ((tableType == PG_LAKE_TABLE_TYPE && tableProperties.format == DATA_FORMAT_ICEBERG) ||
253-
(tableType == PG_LAKE_ICEBERG_TABLE_TYPE &&
254-
(icebergCatalogType == REST_CATALOG_READ_ONLY ||
255-
icebergCatalogType == OBJECT_STORE_READ_ONLY)))
250+
else if (IsAnyExternalIcebergTable(relationId))
256251
{
257252
DataFile *dataFile = (DataFile *) list_nth(dataFiles, dataFileIndex);
258253

@@ -334,7 +329,6 @@ AddFieldIdsUsedInQuery(HTAB *fieldIdsUsedInQuery, Oid relationId, PgLakeTablePro
334329
List *columnsUsedInFilters)
335330
{
336331
PgLakeTableType tableType = tableProperties.tableType;
337-
IcebergCatalogType icebergCatalogType = GetIcebergCatalogType(relationId);
338332

339333
List *attrNos = NIL;
340334
ListCell *columnCell = NULL;
@@ -350,9 +344,7 @@ AddFieldIdsUsedInQuery(HTAB *fieldIdsUsedInQuery, Oid relationId, PgLakeTablePro
350344
/* fetch the field mappings for all columns in a single catalog lookup */
351345
List *fields = NIL;
352346

353-
if (tableType == PG_LAKE_ICEBERG_TABLE_TYPE &&
354-
(icebergCatalogType == POSTGRES_CATALOG || icebergCatalogType == REST_CATALOG_READ_WRITE ||
355-
icebergCatalogType == OBJECT_STORE_READ_WRITE))
347+
if (IsAnyInternalIcebergTable(relationId))
356348
{
357349
fields = GetRegisteredFieldForAttributes(relationId, attrNos);
358350

@@ -362,9 +354,7 @@ AddFieldIdsUsedInQuery(HTAB *fieldIdsUsedInQuery, Oid relationId, PgLakeTablePro
362354
*/
363355
Assert(list_length(fields) == list_length(columnsUsedInFilters));
364356
}
365-
else if ((tableType == PG_LAKE_TABLE_TYPE && tableProperties.format == DATA_FORMAT_ICEBERG) ||
366-
(tableType == PG_LAKE_ICEBERG_TABLE_TYPE && (icebergCatalogType == REST_CATALOG_READ_ONLY ||
367-
icebergCatalogType == OBJECT_STORE_READ_ONLY)))
357+
else if (IsAnyExternalIcebergTable(relationId))
368358
{
369359
fields = GetExternalIcebergFieldsForAttributes(relationId, columnsUsedInFilters);
370360

pg_lake_table/src/fdw/schema_operations/register_field_ids.c

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -237,21 +237,9 @@ CreatePostgresColumnMappingsForColumnDefs(Oid relationId, List *columnDefList, b
237237
List *
238238
CreatePostgresColumnMappingsForIcebergTableFromExternalMetadata(Oid relationId)
239239
{
240-
char *currentMetadataPath = NULL;
241240
IcebergCatalogType icebergCatalogType = GetIcebergCatalogType(relationId);
242241

243-
if (icebergCatalogType == REST_CATALOG_READ_ONLY)
244-
{
245-
currentMetadataPath = GetMetadataLocationForRestCatalogForIcebergTable(relationId);
246-
}
247-
else if (icebergCatalogType == OBJECT_STORE_READ_ONLY)
248-
{
249-
currentMetadataPath = GetMetadataLocationFromExternalObjectStoreCatalogForTable(relationId);
250-
}
251-
else
252-
{
253-
currentMetadataPath = GetIcebergCatalogMetadataLocation(relationId, true);
254-
}
242+
char *currentMetadataPath = GetIcebergExternalMetadataLocation(relationId);
255243

256244
DataFileSchema *schema = GetDataFileSchemaForExternalIcebergTable(currentMetadataPath);
257245

0 commit comments

Comments
 (0)