Skip to content

Commit f2540e4

Browse files
Support pruning for external tables
Signed-off-by: Aykut Bozkurt <[email protected]>
1 parent 8b40701 commit f2540e4

File tree

5 files changed

+165
-4
lines changed

5 files changed

+165
-4
lines changed

pg_lake_iceberg/include/pg_lake/iceberg/api/table_schema.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ extern PGDLLEXPORT IcebergTableSchema * GetIcebergTableSchemaByIdFromTableMetada
2727
extern PGDLLEXPORT IcebergTableSchema * GetCurrentIcebergTableSchema(IcebergTableMetadata * metadata);
2828
extern PGDLLEXPORT List *GetLeafFieldsFromIcebergMetadata(IcebergTableMetadata * metadata);
2929
extern PGDLLEXPORT List *GetLeafFieldsForIcebergSchema(IcebergTableSchema * schema);
30+
extern PGDLLEXPORT DataFileSchemaField * GetDataFileSchemaFieldById(DataFileSchema * schema, int fieldId);
3031

3132
/* write api */
3233
extern PGDLLEXPORT IcebergTableSchema * RebuildIcebergSchemaFromDataFileSchema(Oid foreignTableOid,

pg_lake_iceberg/src/iceberg/api/table_schema.c

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,38 @@ GetLeafFieldsForIcebergSchema(IcebergTableSchema * schema)
169169
}
170170

171171

172+
/*
173+
* GetDataFileSchemaFieldById gets the DataFileSchemaField for the given
174+
* iceberg field id.
175+
*/
176+
DataFileSchemaField *
177+
GetDataFileSchemaFieldById(DataFileSchema * schema, int fieldId)
178+
{
179+
DataFileSchemaField *schemaField = NULL;
180+
181+
for (size_t fieldIdx = 0; fieldIdx < schema->nfields; fieldIdx++)
182+
{
183+
DataFileSchemaField *field = &schema->fields[fieldIdx];
184+
185+
if (field->id == fieldId)
186+
{
187+
schemaField = field;
188+
break;
189+
}
190+
}
191+
192+
if (schemaField == NULL)
193+
{
194+
ereport(ERROR,
195+
(errcode(ERRCODE_INTERNAL_ERROR),
196+
errmsg("field ID %d not found",
197+
fieldId)));
198+
}
199+
200+
return schemaField;
201+
}
202+
203+
172204
/*
173205
* GetLeafFieldsForField returns the leaf fields for the given field.
174206
* It recursively traverses the field tree and collects all leaf fields, and

pg_lake_table/src/fdw/data_file_pruning.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ PruneDataFiles(Oid relationId, List *dataFiles, List *baseRestrictInfoList, Prun
254254
char *dataFilePath = (char *) ((DataFile *) dataFile)->file_path;
255255

256256
columnStats = GetRemoteParquetColumnStats(dataFilePath, leafFields);
257+
partition = CopyPartition(&dataFile->partition);
257258
}
258259
else
259260
{

pg_lake_table/src/fdw/partition_transform.c

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
#include "pg_lake/pgduck/numeric.h"
2626
#include "pg_lake/fdw/partition_transform.h"
2727
#include "pg_lake/fdw/schema_operations/field_id_mapping_catalog.h"
28+
#include "pg_lake/fdw/schema_operations/register_field_ids.h"
2829
#include "pg_lake/iceberg/api/partitioning.h"
30+
#include "pg_lake/iceberg/catalog.h"
2931
#include "pg_lake/iceberg/iceberg_field.h"
3032
#include "pg_lake/iceberg/iceberg_type_binary_serde.h"
3133
#include "pg_lake/iceberg/iceberg_type_numeric_binary_serde.h"
@@ -35,6 +37,7 @@
3537
#include "pg_lake/iceberg/hash_utils.h"
3638
#include "pg_lake/iceberg/truncate_utils.h"
3739
#include "pg_lake/util/numeric.h"
40+
#include "pg_lake/util/rel_utils.h"
3841

3942
static PartitionField * ApplyPartitionTransformToTuple(IcebergPartitionTransform * transform,
4043
TupleTableSlot *slot);
@@ -63,6 +66,7 @@ static void ParseTransformName(const char *name, IcebergPartitionTransformType *
6366
static bool ParseBracketUintSize(const char *name, const char *prefix, size_t *outVal);
6467
static void *DatumToPartitionValue(IcebergPartitionTransform * transform, Datum columnValue, bool isNull,
6568
size_t *valuelength);
69+
static List *AllPartitionSpecFieldsForExternalIcebergTable(char *metadataPath);
6670

6771
/*
6872
* ComputePartitionTupleForTuple applies relative partition transforms
@@ -125,18 +129,55 @@ CurrentPartitionTransformList(Oid relationId)
125129
List *
126130
AllPartitionTransformList(Oid relationId)
127131
{
128-
List *partitionFields = GetAllIcebergSpecPartitionFieldsFromCatalog(relationId);
132+
List *partitionFields = NIL;
129133

130-
if (partitionFields == NIL)
134+
if (IsInternalIcebergTable(relationId))
135+
{
136+
/* internal iceberg table, get from catalog */
137+
partitionFields = GetAllIcebergSpecPartitionFieldsFromCatalog(relationId);
138+
}
139+
else if (IsExternalIcebergTable(relationId))
131140
{
141+
/* external iceberg table, get from remote metadata */
142+
char *currentMetadataPath = GetIcebergMetadataLocation(relationId, false);
143+
144+
partitionFields = AllPartitionSpecFieldsForExternalIcebergTable(currentMetadataPath);
145+
}
146+
147+
if (partitionFields == NIL)
132148
/* not partitioned */
133149
return NIL;
134-
}
135150

136151
return GetPartitionTransformsFromSpecFields(relationId, partitionFields);
137152
}
138153

139154

155+
/*
156+
* AllPartitionSpecFieldsForExternalIcebergTable gets all the partition spec fields
157+
* for the external Iceberg table.
158+
*/
159+
static List *
160+
AllPartitionSpecFieldsForExternalIcebergTable(char *metadataPath)
161+
{
162+
IcebergTableMetadata *metadata = ReadIcebergTableMetadata(metadataPath);
163+
164+
List *allPartitionFields = NIL;
165+
166+
for (int specIdx = 0; specIdx < metadata->partition_specs_length; specIdx++)
167+
{
168+
IcebergPartitionSpec *spec = &metadata->partition_specs[specIdx];
169+
170+
for (int partitionFieldIdx = 0; partitionFieldIdx < spec->fields_length; partitionFieldIdx++)
171+
{
172+
IcebergPartitionSpecField *field = &spec->fields[partitionFieldIdx];
173+
174+
allPartitionFields = lappend(allPartitionFields, field);
175+
}
176+
}
177+
178+
return allPartitionFields;
179+
}
180+
140181

141182
/*
142183
* GetPartitionTransformsFromSpecFields is a wrapper around
@@ -184,7 +225,19 @@ GetPartitionTransformFromSpecField(Oid relationId, IcebergPartitionSpecField * s
184225
GetAttributeForFieldId(relationId, specField->source_id);
185226
transform->columnName = get_attname(relationId, transform->attnum, false);
186227
transform->pgType = GetAttributePGType(relationId, transform->attnum);
187-
transform->sourceField = GetRegisteredFieldForAttribute(relationId, transform->attnum);
228+
229+
if (IsInternalIcebergTable(relationId))
230+
{
231+
transform->sourceField = GetRegisteredFieldForAttribute(relationId, transform->attnum);
232+
}
233+
else
234+
{
235+
Assert(IsExternalIcebergTable(relationId));
236+
237+
DataFileSchema *schema = GetDataFileSchemaForTable(relationId);
238+
239+
transform->sourceField = GetDataFileSchemaFieldById(schema, specField->source_id);
240+
}
188241

189242
/* parse transform name */
190243
ParseTransformName(transform->transformName,

pg_lake_table/src/fdw/schema_operations/field_id_mapping_catalog.c

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,23 @@
4141
#include "pg_lake/fdw/schema_operations/field_id_mapping_catalog.h"
4242
#include "pg_lake/fdw/schema_operations/register_field_ids.h"
4343
#include "pg_lake/iceberg/api/table_metadata.h"
44+
#include "pg_lake/iceberg/api/table_schema.h"
4445
#include "pg_lake/iceberg/catalog.h"
4546
#include "pg_lake/iceberg/iceberg_field.h"
4647
#include "pg_lake/parquet/leaf_field.h"
4748
#include "pg_lake/pgduck/map.h"
4849
#include "pg_lake/pgduck/serialize.h"
4950
#include "pg_lake/util/array_utils.h"
51+
#include "pg_lake/util/rel_utils.h"
5052
#include "pg_lake/util/spi_helpers.h"
5153

5254
static DataFileSchemaField * CreateRegisteredFieldForAttribute(Oid relationId, int spiIndex);
5355
static void InsertFieldMapping(Oid relationId, int attrIcebergFieldId,
5456
AttrNumber pg_attnum, PGType pgType,
5557
const char *writeDefault, const char *initialDefault,
5658
int parentFieldId);
59+
static AttrNumber GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId);
60+
static AttrNumber GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId);
5761

5862
#ifdef USE_ASSERT_CHECKING
5963
static List *GetAllRegisteredAttnumsForTopLevelColumns(Oid relationId);
@@ -244,6 +248,28 @@ GetRegisteredFieldForAttribute(Oid relationId, AttrNumber attrNo)
244248
*/
245249
AttrNumber
246250
GetAttributeForFieldId(Oid relationId, int fieldId)
251+
{
252+
if (IsInternalIcebergTable(relationId))
253+
{
254+
return GetAttributeForFieldIdForInternalIcebergTable(relationId, fieldId, inCurrentSchema);
255+
}
256+
else
257+
{
258+
Assert(IsExternalIcebergTable(relationId));
259+
260+
char *currentMetadataPath = GetIcebergMetadataLocation(relationId, false);
261+
262+
return GetAttributeForFieldIdForExternalIcebergTable(currentMetadataPath, relationId, fieldId);
263+
}
264+
}
265+
266+
267+
/*
268+
* GetAttributeForFieldIdForInternalIcebergTable gets the attribute number for a given field ID
269+
* for internal Iceberg tables from catalog.
270+
*/
271+
static AttrNumber
272+
GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId)
247273
{
248274
DECLARE_SPI_ARGS(2);
249275

@@ -280,6 +306,54 @@ GetAttributeForFieldId(Oid relationId, int fieldId)
280306
}
281307

282308

309+
/*
310+
* GetAttributeForFieldIdForExternalIcebergTable gets the attribute number for a given field ID
311+
* for external Iceberg tables from iceberg metadata.
312+
*/
313+
static AttrNumber
314+
GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId)
315+
{
316+
DataFileSchema *schema = GetDataFileSchemaForExternalIcebergTable(metadataPath);
317+
318+
DataFileSchemaField *schemaField = GetDataFileSchemaFieldById(schema, fieldId);
319+
320+
char *attrName = pstrdup(schemaField->name);
321+
322+
DECLARE_SPI_ARGS(2);
323+
324+
SPI_ARG_VALUE(1, OIDOID, relationId, false);
325+
SPI_ARG_VALUE(2, TEXTOID, attrName, false);
326+
327+
SPI_START();
328+
329+
/*
330+
* Although this is a read-only query, we need the execution to use the
331+
* current transaction's snapshot (e.g., GetTransactionSnapshot()) to get
332+
* the snapshot that the current transaction modified.
333+
*
334+
* So we trick the SPI_EXECUTE function to think that the query is not
335+
* read-only and read the transaction snapshot.
336+
*/
337+
bool readOnly = false;
338+
339+
SPI_EXECUTE("SELECT attnum FROM pg_attribute "
340+
"WHERE attrelid OPERATOR(pg_catalog.=) $1 "
341+
"AND attname OPERATOR(pg_catalog.=) $2 "
342+
"AND NOT attisdropped", readOnly);
343+
344+
/* there is a primary key on these filters */
345+
Assert(SPI_processed == 1);
346+
347+
bool isNull = false;
348+
AttrNumber attrNo = GET_SPI_VALUE(INT2OID, 0, 1, &isNull);
349+
350+
Assert(!isNull);
351+
352+
SPI_END();
353+
354+
return attrNo;
355+
}
356+
283357

284358
/*
285359
* GetDataFileSchemaForInternalIcebergTable gets a table schema based on the MAPPING_TABLE_NAME.

0 commit comments

Comments
 (0)