Skip to content

Commit 3721417

Browse files
skip building transform if external table spec field is from old schema
Signed-off-by: Aykut Bozkurt <[email protected]>
1 parent f2540e4 commit 3721417

File tree

3 files changed

+54
-28
lines changed

3 files changed

+54
-28
lines changed

pg_lake_table/include/pg_lake/fdw/schema_operations/field_id_mapping_catalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ extern PGDLLEXPORT DataFileSchema * GetDataFileSchemaForInternalIcebergTable(Oid
3131
extern PGDLLEXPORT List *GetLeafFieldsForInternalIcebergTable(Oid relationId);
3232
extern PGDLLEXPORT List *GetRegisteredFieldForAttributes(Oid relationId, List *attrNos);
3333
extern PGDLLEXPORT DataFileSchemaField * GetRegisteredFieldForAttribute(Oid relationId, AttrNumber attrNo);
34-
extern PGDLLEXPORT AttrNumber GetAttributeForFieldId(Oid relationId, int fieldId);
34+
extern PGDLLEXPORT AttrNumber GetAttributeForFieldId(Oid relationId, int fieldId, bool *inCurrentSchema);
3535
extern PGDLLEXPORT void UpdateRegisteredFieldWriteDefaultForAttribute(Oid relationId, AttrNumber attNum, const char *writeDefault);
3636
extern PGDLLEXPORT int GetLargestRegisteredFieldId(Oid relationId);
3737
extern PGDLLEXPORT void RegisterIcebergColumnMapping(Oid relationId, Field * field,

pg_lake_table/src/fdw/partition_transform.c

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ static void *ApplyDayTransformToColumn(IcebergPartitionTransform * transform,
5959
static void *ApplyHourTransformToColumn(IcebergPartitionTransform * transform,
6060
Datum columnValue, bool isNull,
6161
size_t *valueSize);
62-
static IcebergPartitionTransform * GetPartitionTransformFromSpecField(Oid relationId,
62+
static IcebergPartitionTransform * GetPartitionTransformFromSpecField(Oid relationId, AttrNumber attNum,
6363
IcebergPartitionSpecField * specField);
6464
static void ParseTransformName(const char *name, IcebergPartitionTransformType * type,
6565
size_t *bucketCount, size_t *truncateLen);
@@ -196,7 +196,19 @@ GetPartitionTransformsFromSpecFields(Oid relationId, List *specFields)
196196
{
197197
IcebergPartitionSpecField *specField = lfirst(lc);
198198

199-
IcebergPartitionTransform *transform = GetPartitionTransformFromSpecField(relationId, specField);
199+
bool inCurrentSchema = false;
200+
201+
AttrNumber attNum =
202+
GetAttributeForFieldId(relationId, specField->source_id, &inCurrentSchema);
203+
204+
/*
205+
* skip building transform if there is no corresponding attribute in
206+
* the current postgres schema, which means no pruning is possible
207+
*/
208+
if (!inCurrentSchema)
209+
continue;
210+
211+
IcebergPartitionTransform *transform = GetPartitionTransformFromSpecField(relationId, attNum, specField);
200212

201213
Assert(transform != NULL);
202214
transformList = lappend(transformList, transform);
@@ -213,16 +225,14 @@ GetPartitionTransformsFromSpecFields(Oid relationId, List *specFields)
213225
* bucket count, and truncate length for ease of use.
214226
*/
215227
static IcebergPartitionTransform *
216-
GetPartitionTransformFromSpecField(Oid relationId, IcebergPartitionSpecField * specField)
228+
GetPartitionTransformFromSpecField(Oid relationId, AttrNumber attNum, IcebergPartitionSpecField * specField)
217229
{
218230
IcebergPartitionTransform *transform = palloc0(sizeof(IcebergPartitionTransform));
219231

220232
transform->partitionFieldId = specField->field_id;
221233
transform->partitionFieldName = pstrdup(specField->name);
222234
transform->transformName = pstrdup(specField->transform);
223-
224-
transform->attnum =
225-
GetAttributeForFieldId(relationId, specField->source_id);
235+
transform->attnum = attNum;
226236
transform->columnName = get_attname(relationId, transform->attnum, false);
227237
transform->pgType = GetAttributePGType(relationId, transform->attnum);
228238

pg_lake_table/src/fdw/schema_operations/field_id_mapping_catalog.c

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ static void InsertFieldMapping(Oid relationId, int attrIcebergFieldId,
5656
AttrNumber pg_attnum, PGType pgType,
5757
const char *writeDefault, const char *initialDefault,
5858
int parentFieldId);
59-
static AttrNumber GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId);
60-
static AttrNumber GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId);
59+
static AttrNumber GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId, bool *inCurrentSchema);
60+
static AttrNumber GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId, bool *inCurrentSchema);
6161

6262
#ifdef USE_ASSERT_CHECKING
6363
static List *GetAllRegisteredAttnumsForTopLevelColumns(Oid relationId);
@@ -244,10 +244,12 @@ GetRegisteredFieldForAttribute(Oid relationId, AttrNumber attrNo)
244244
}
245245

246246
/*
247-
* GetAttributeForFieldId gets the attribute number for a given field ID.
248-
*/
247+
* GetAttributeForFieldId gets the attribute number for a given field ID.
248+
* It sets inCurrentSchema if there is no attribute in current postgres schema
249+
* which corresponds to the field.
250+
*/
249251
AttrNumber
250-
GetAttributeForFieldId(Oid relationId, int fieldId)
252+
GetAttributeForFieldId(Oid relationId, int fieldId, bool *inCurrentSchema)
251253
{
252254
if (IsInternalIcebergTable(relationId))
253255
{
@@ -259,17 +261,18 @@ GetAttributeForFieldId(Oid relationId, int fieldId)
259261

260262
char *currentMetadataPath = GetIcebergMetadataLocation(relationId, false);
261263

262-
return GetAttributeForFieldIdForExternalIcebergTable(currentMetadataPath, relationId, fieldId);
264+
return GetAttributeForFieldIdForExternalIcebergTable(currentMetadataPath, relationId, fieldId, inCurrentSchema);
263265
}
264266
}
265267

266268

267269
/*
268270
* GetAttributeForFieldIdForInternalIcebergTable gets the attribute number for a given field ID
269-
* for internal Iceberg tables from catalog.
271+
* for internal Iceberg tables from catalog. It also sets if the corresponding attribute is in the
272+
* current postgres schema.
270273
*/
271274
static AttrNumber
272-
GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId)
275+
GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId, bool *inCurrentSchema)
273276
{
274277
DECLARE_SPI_ARGS(2);
275278

@@ -288,17 +291,26 @@ GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId)
288291
*/
289292
bool readOnly = false;
290293

291-
SPI_EXECUTE("SELECT pg_attnum FROM " MAPPING_TABLE_NAME
294+
SPI_EXECUTE("SELECT m.pg_attnum, not attisdropped as in_current_schema "
295+
"FROM " MAPPING_TABLE_NAME " m JOIN pg_attribute p ON "
296+
" m.table_name OPERATOR(pg_catalog.=) p.attrelid "
297+
" AND m.pg_attnum OPERATOR(pg_catalog.=) p.attnum"
292298
" WHERE table_name OPERATOR(pg_catalog.=) $1"
293299
" AND field_id OPERATOR(pg_catalog.=) $2", readOnly);
294300

295301
/* there is a primary key on these filters */
296302
Assert(SPI_processed == 1);
297303

298-
bool isNull = false;
299-
AttrNumber attrNo = GET_SPI_VALUE(INT2OID, 0, 1, &isNull);
304+
bool isAttrNumNull = false;
305+
AttrNumber attrNo = GET_SPI_VALUE(INT2OID, 0, 1, &isAttrNumNull);
306+
307+
Assert(!isAttrNumNull);
300308

301-
Assert(!isNull);
309+
bool inCurrentSchemaNull = false;
310+
311+
*inCurrentSchema = GET_SPI_VALUE(BOOLOID, 0, 2, &inCurrentSchemaNull);
312+
313+
Assert(!inCurrentSchemaNull);
302314

303315
SPI_END();
304316

@@ -311,7 +323,7 @@ GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId)
311323
* for external Iceberg tables from iceberg metadata.
312324
*/
313325
static AttrNumber
314-
GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId)
326+
GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId, bool *inCurrentSchema)
315327
{
316328
DataFileSchema *schema = GetDataFileSchemaForExternalIcebergTable(metadataPath);
317329

@@ -338,16 +350,20 @@ GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId
338350

339351
SPI_EXECUTE("SELECT attnum FROM pg_attribute "
340352
"WHERE attrelid OPERATOR(pg_catalog.=) $1 "
341-
"AND attname OPERATOR(pg_catalog.=) $2 "
342-
"AND NOT attisdropped", readOnly);
353+
"AND attname OPERATOR(pg_catalog.=) $2", readOnly);
343354

344-
/* there is a primary key on these filters */
345-
Assert(SPI_processed == 1);
355+
AttrNumber attrNo = 0;
346356

347-
bool isNull = false;
348-
AttrNumber attrNo = GET_SPI_VALUE(INT2OID, 0, 1, &isNull);
349-
350-
Assert(!isNull);
357+
if (SPI_processed == 0)
358+
{
359+
*inCurrentSchema = false;
360+
attrNo = -1;
361+
}
362+
else
363+
{
364+
Assert(SPI_processed == 1);
365+
attrNo = GET_SPI_VALUE(INT2OID, 0, 1, inCurrentSchema);
366+
}
351367

352368
SPI_END();
353369

0 commit comments

Comments
 (0)