Skip to content

Commit 2973ec2

Browse files
Support pruning for external tables
Signed-off-by: Aykut Bozkurt <[email protected]>
1 parent 22c7548 commit 2973ec2

File tree

6 files changed

+211
-7
lines changed

6 files changed

+211
-7
lines changed

pg_lake_iceberg/include/pg_lake/iceberg/api/table_schema.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ extern PGDLLEXPORT IcebergTableSchema * GetIcebergTableSchemaByIdFromTableMetada
2727
extern PGDLLEXPORT IcebergTableSchema * GetCurrentIcebergTableSchema(IcebergTableMetadata * metadata);
2828
extern PGDLLEXPORT List *GetLeafFieldsFromIcebergMetadata(IcebergTableMetadata * metadata);
2929
extern PGDLLEXPORT List *GetLeafFieldsForIcebergSchema(IcebergTableSchema * schema);
30+
extern PGDLLEXPORT DataFileSchemaField * GetDataFileSchemaFieldById(DataFileSchema * schema, int fieldId);
3031

3132
/* write api */
3233
extern PGDLLEXPORT IcebergTableSchema * RebuildIcebergSchemaFromDataFileSchema(Oid foreignTableOid,

pg_lake_iceberg/src/iceberg/api/table_schema.c

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,38 @@ GetLeafFieldsForIcebergSchema(IcebergTableSchema * schema)
169169
}
170170

171171

172+
/*
173+
* GetDataFileSchemaFieldById gets the DataFileSchemaField for the given
174+
* iceberg field id.
175+
*/
176+
DataFileSchemaField *
177+
GetDataFileSchemaFieldById(DataFileSchema * schema, int fieldId)
178+
{
179+
DataFileSchemaField *schemaField = NULL;
180+
181+
for (size_t fieldIdx = 0; fieldIdx < schema->nfields; fieldIdx++)
182+
{
183+
DataFileSchemaField *field = &schema->fields[fieldIdx];
184+
185+
if (field->id == fieldId)
186+
{
187+
schemaField = field;
188+
break;
189+
}
190+
}
191+
192+
if (schemaField == NULL)
193+
{
194+
ereport(ERROR,
195+
(errcode(ERRCODE_INTERNAL_ERROR),
196+
errmsg("field ID %d not found",
197+
fieldId)));
198+
}
199+
200+
return schemaField;
201+
}
202+
203+
172204
/*
173205
* GetLeafFieldsForField returns the leaf fields for the given field.
174206
* It recursively traverses the field tree and collects all leaf fields, and

pg_lake_table/include/pg_lake/fdw/partition_transform.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ extern void *ApplyBucketTransformToColumn(IcebergPartitionTransform * transform,
2929
size_t *bucketSize);
3030
extern List *CurrentPartitionTransformList(Oid relationId);
3131
extern List *AllPartitionTransformList(Oid relationId);
32+
extern List *PartitionTransformListForSourceFields(Oid relationId, HTAB *sourceFieldsHash);
3233
extern List *GetPartitionTransformsFromSpecFields(Oid relationId, List *specFields);
3334
extern void *DeserializePartitionValueFromPGText(IcebergPartitionTransform * transform,
3435
const char *valueText, size_t *valueLength);

pg_lake_table/src/fdw/data_file_pruning.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ PruneDataFiles(Oid relationId, List *dataFiles, List *baseRestrictInfoList, Prun
182182
{
183183
List *retainedDataFiles = NIL;
184184
List *columnsUsedInFilters = ColumnsUsedInRestrictions(relationId, baseRestrictInfoList);
185-
List *partitionTransforms = AllPartitionTransformList(relationId);
186185
PgLakeTableProperties tableProperties = GetPgLakeTableProperties(relationId);
187186

188187
if ((!EnableDataFilePruning && !EnablePartitionPruning) ||
@@ -230,6 +229,9 @@ PruneDataFiles(Oid relationId, List *dataFiles, List *baseRestrictInfoList, Prun
230229

231230
AddFieldIdsUsedInQuery(fieldIdsUsedInQuery, relationId, tableProperties, columnsUsedInFilters);
232231

232+
/* get partition transforms for the fields used in the query */
233+
List *partitionTransforms = PartitionTransformListForSourceFields(relationId, fieldIdsUsedInQuery);
234+
233235
int dataFileCount = list_length(dataFiles);
234236

235237
for (int dataFileIndex = 0; dataFileIndex < dataFileCount; ++dataFileIndex)
@@ -254,6 +256,7 @@ PruneDataFiles(Oid relationId, List *dataFiles, List *baseRestrictInfoList, Prun
254256
char *dataFilePath = (char *) ((DataFile *) dataFile)->file_path;
255257

256258
columnStats = GetRemoteParquetColumnStats(dataFilePath, leafFields);
259+
partition = CopyPartition(&dataFile->partition);
257260
}
258261
else
259262
{

pg_lake_table/src/fdw/partition_transform.c

Lines changed: 99 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
#include "pg_lake/pgduck/numeric.h"
2626
#include "pg_lake/fdw/partition_transform.h"
2727
#include "pg_lake/fdw/schema_operations/field_id_mapping_catalog.h"
28+
#include "pg_lake/fdw/schema_operations/register_field_ids.h"
2829
#include "pg_lake/iceberg/api/partitioning.h"
30+
#include "pg_lake/iceberg/catalog.h"
2931
#include "pg_lake/iceberg/iceberg_field.h"
3032
#include "pg_lake/iceberg/iceberg_type_binary_serde.h"
3133
#include "pg_lake/iceberg/iceberg_type_numeric_binary_serde.h"
@@ -35,6 +37,7 @@
3537
#include "pg_lake/iceberg/hash_utils.h"
3638
#include "pg_lake/iceberg/truncate_utils.h"
3739
#include "pg_lake/util/numeric.h"
40+
#include "pg_lake/util/rel_utils.h"
3841

3942
static PartitionField * ApplyPartitionTransformToTuple(IcebergPartitionTransform * transform,
4043
TupleTableSlot *slot);
@@ -63,6 +66,8 @@ static void ParseTransformName(const char *name, IcebergPartitionTransformType *
6366
static bool ParseBracketUintSize(const char *name, const char *prefix, size_t *outVal);
6467
static void *DatumToPartitionValue(IcebergPartitionTransform * transform, Datum columnValue, bool isNull,
6568
size_t *valuelength);
69+
static List *AllPartitionSpecFieldsForExternalIcebergTable(char *metadataPath);
70+
static List *GetAllPartitionSpecFields(Oid relationId);
6671

6772
/*
6873
* ComputePartitionTupleForTuple applies relative partition transforms
@@ -125,18 +130,94 @@ CurrentPartitionTransformList(Oid relationId)
125130
List *
126131
AllPartitionTransformList(Oid relationId)
127132
{
128-
List *partitionFields = GetAllIcebergSpecPartitionFieldsFromCatalog(relationId);
133+
List *partitionFields = GetAllPartitionSpecFields(relationId);
129134

130-
if (partitionFields == NIL)
135+
return GetPartitionTransformsFromSpecFields(relationId, partitionFields);
136+
}
137+
138+
139+
/*
140+
* PartitionTransformListForSourceFields gets the partition transforms for only the fields
141+
* present in the sourceFieldsHash.
142+
*/
143+
List *
144+
PartitionTransformListForSourceFields(Oid relationId, HTAB *sourceFieldsHash)
145+
{
146+
List *partitionFields = GetAllPartitionSpecFields(relationId);
147+
148+
List *filteredPartitionFields = NIL;
149+
ListCell *filteredPartitionFieldCell = NULL;
150+
151+
foreach(filteredPartitionFieldCell, partitionFields)
131152
{
132-
/* not partitioned */
133-
return NIL;
153+
IcebergPartitionSpecField *specField = lfirst(filteredPartitionFieldCell);
154+
155+
bool found = false;
156+
157+
hash_search(sourceFieldsHash, &specField->source_id, HASH_FIND, &found);
158+
159+
if (found)
160+
{
161+
filteredPartitionFields = lappend(filteredPartitionFields, specField);
162+
}
134163
}
135164

136-
return GetPartitionTransformsFromSpecFields(relationId, partitionFields);
165+
return GetPartitionTransformsFromSpecFields(relationId, filteredPartitionFields);
137166
}
138167

139168

169+
/*
170+
* GetAllPartitionSpecFields gets all the partition spec fields for the given
171+
* relationId.
172+
*/
173+
static List *
174+
GetAllPartitionSpecFields(Oid relationId)
175+
{
176+
List *partitionFields = NIL;
177+
178+
if (IsInternalIcebergTable(relationId))
179+
{
180+
/* internal iceberg table, get from catalog */
181+
partitionFields = GetAllIcebergSpecPartitionFieldsFromCatalog(relationId);
182+
}
183+
else if (IsExternalIcebergTable(relationId))
184+
{
185+
/* external iceberg table, get from remote metadata */
186+
char *currentMetadataPath = GetIcebergMetadataLocation(relationId, false);
187+
188+
partitionFields = AllPartitionSpecFieldsForExternalIcebergTable(currentMetadataPath);
189+
}
190+
191+
return partitionFields;
192+
}
193+
194+
195+
/*
196+
* AllPartitionSpecFieldsForExternalIcebergTable gets all the partition spec fields
197+
* for the external Iceberg table.
198+
*/
199+
static List *
200+
AllPartitionSpecFieldsForExternalIcebergTable(char *metadataPath)
201+
{
202+
IcebergTableMetadata *metadata = ReadIcebergTableMetadata(metadataPath);
203+
204+
List *allPartitionFields = NIL;
205+
206+
for (int specIdx = 0; specIdx < metadata->partition_specs_length; specIdx++)
207+
{
208+
IcebergPartitionSpec *spec = &metadata->partition_specs[specIdx];
209+
210+
for (int partitionFieldIdx = 0; partitionFieldIdx < spec->fields_length; partitionFieldIdx++)
211+
{
212+
IcebergPartitionSpecField *field = &spec->fields[partitionFieldIdx];
213+
214+
allPartitionFields = lappend(allPartitionFields, field);
215+
}
216+
}
217+
218+
return allPartitionFields;
219+
}
220+
140221

141222
/*
142223
* GetPartitionTransformsFromSpecFields is a wrapper around
@@ -184,7 +265,19 @@ GetPartitionTransformFromSpecField(Oid relationId, IcebergPartitionSpecField * s
184265
GetAttributeForFieldId(relationId, specField->source_id);
185266
transform->columnName = get_attname(relationId, transform->attnum, false);
186267
transform->pgType = GetAttributePGType(relationId, transform->attnum);
187-
transform->sourceField = GetRegisteredFieldForAttribute(relationId, transform->attnum);
268+
269+
if (IsInternalIcebergTable(relationId))
270+
{
271+
transform->sourceField = GetRegisteredFieldForAttribute(relationId, transform->attnum);
272+
}
273+
else
274+
{
275+
Assert(IsExternalIcebergTable(relationId));
276+
277+
DataFileSchema *schema = GetDataFileSchemaForTable(relationId);
278+
279+
transform->sourceField = GetDataFileSchemaFieldById(schema, specField->source_id);
280+
}
188281

189282
/* parse transform name */
190283
ParseTransformName(transform->transformName,

pg_lake_table/src/fdw/schema_operations/field_id_mapping_catalog.c

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,23 @@
4141
#include "pg_lake/fdw/schema_operations/field_id_mapping_catalog.h"
4242
#include "pg_lake/fdw/schema_operations/register_field_ids.h"
4343
#include "pg_lake/iceberg/api/table_metadata.h"
44+
#include "pg_lake/iceberg/api/table_schema.h"
4445
#include "pg_lake/iceberg/catalog.h"
4546
#include "pg_lake/iceberg/iceberg_field.h"
4647
#include "pg_lake/parquet/leaf_field.h"
4748
#include "pg_lake/pgduck/map.h"
4849
#include "pg_lake/pgduck/serialize.h"
4950
#include "pg_lake/util/array_utils.h"
51+
#include "pg_lake/util/rel_utils.h"
5052
#include "pg_lake/util/spi_helpers.h"
5153

5254
static DataFileSchemaField * CreateRegisteredFieldForAttribute(Oid relationId, int spiIndex);
5355
static void InsertFieldMapping(Oid relationId, int attrIcebergFieldId,
5456
AttrNumber pg_attnum, PGType pgType,
5557
const char *writeDefault, const char *initialDefault,
5658
int parentFieldId);
59+
static AttrNumber GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId);
60+
static AttrNumber GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId);
5761

5862
#ifdef USE_ASSERT_CHECKING
5963
static List *GetAllRegisteredAttnumsForTopLevelColumns(Oid relationId);
@@ -244,6 +248,28 @@ GetRegisteredFieldForAttribute(Oid relationId, AttrNumber attrNo)
244248
*/
245249
AttrNumber
246250
GetAttributeForFieldId(Oid relationId, int fieldId)
251+
{
252+
if (IsInternalIcebergTable(relationId))
253+
{
254+
return GetAttributeForFieldIdForInternalIcebergTable(relationId, fieldId);
255+
}
256+
else
257+
{
258+
Assert(IsExternalIcebergTable(relationId));
259+
260+
char *currentMetadataPath = GetIcebergMetadataLocation(relationId, false);
261+
262+
return GetAttributeForFieldIdForExternalIcebergTable(currentMetadataPath, relationId, fieldId);
263+
}
264+
}
265+
266+
267+
/*
268+
* GetAttributeForFieldIdForInternalIcebergTable gets the attribute number for a given field ID
269+
* for internal Iceberg tables from catalog.
270+
*/
271+
static AttrNumber
272+
GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId)
247273
{
248274
DECLARE_SPI_ARGS(2);
249275

@@ -280,6 +306,54 @@ GetAttributeForFieldId(Oid relationId, int fieldId)
280306
}
281307

282308

309+
/*
310+
* GetAttributeForFieldIdForExternalIcebergTable gets the attribute number for a given field ID
311+
* for external Iceberg tables from iceberg metadata.
312+
*/
313+
static AttrNumber
314+
GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId)
315+
{
316+
DataFileSchema *schema = GetDataFileSchemaForExternalIcebergTable(metadataPath);
317+
318+
DataFileSchemaField *schemaField = GetDataFileSchemaFieldById(schema, fieldId);
319+
320+
char *attrName = pstrdup(schemaField->name);
321+
322+
DECLARE_SPI_ARGS(2);
323+
324+
SPI_ARG_VALUE(1, OIDOID, relationId, false);
325+
SPI_ARG_VALUE(2, TEXTOID, attrName, false);
326+
327+
SPI_START();
328+
329+
/*
330+
* Although this is a read-only query, we need the execution to use the
331+
* current transaction's snapshot (e.g., GetTransactionSnapshot()) to get
332+
* the snapshot that the current transaction modified.
333+
*
334+
* So we trick the SPI_EXECUTE function to think that the query is not
335+
* read-only and read the transaction snapshot.
336+
*/
337+
bool readOnly = false;
338+
339+
SPI_EXECUTE("SELECT attnum FROM pg_attribute "
340+
"WHERE attrelid OPERATOR(pg_catalog.=) $1 "
341+
"AND attname OPERATOR(pg_catalog.=) $2 "
342+
"AND NOT attisdropped", readOnly);
343+
344+
/* there is a primary key on these filters */
345+
Assert(SPI_processed == 1);
346+
347+
bool isNull = false;
348+
AttrNumber attrNo = GET_SPI_VALUE(INT2OID, 0, 1, &isNull);
349+
350+
Assert(!isNull);
351+
352+
SPI_END();
353+
354+
return attrNo;
355+
}
356+
283357

284358
/*
285359
* GetDataFileSchemaForInternalIcebergTable gets a table schema based on the MAPPING_TABLE_NAME.

0 commit comments

Comments
 (0)