Skip to content

Commit f01087b

Browse files
skip building transform if external table spec field is from old schema
Signed-off-by: Aykut Bozkurt <[email protected]>
1 parent 8d23445 commit f01087b

File tree

3 files changed

+59
-31
lines changed

3 files changed

+59
-31
lines changed

pg_lake_table/include/pg_lake/fdw/schema_operations/field_id_mapping_catalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ extern PGDLLEXPORT DataFileSchema * GetDataFileSchemaForInternalIcebergTable(Oid
3131
extern PGDLLEXPORT List *GetLeafFieldsForInternalIcebergTable(Oid relationId);
3232
extern PGDLLEXPORT List *GetRegisteredFieldForAttributes(Oid relationId, List *attrNos);
3333
extern PGDLLEXPORT DataFileSchemaField * GetRegisteredFieldForAttribute(Oid relationId, AttrNumber attrNo);
34-
extern PGDLLEXPORT AttrNumber GetAttributeForFieldId(Oid relationId, int fieldId);
34+
extern PGDLLEXPORT AttrNumber GetAttributeForFieldId(Oid relationId, int fieldId, bool *inCurrentSchema);
3535
extern PGDLLEXPORT void UpdateRegisteredFieldWriteDefaultForAttribute(Oid relationId, AttrNumber attNum, const char *writeDefault);
3636
extern PGDLLEXPORT int GetLargestRegisteredFieldId(Oid relationId);
3737
extern PGDLLEXPORT void RegisterIcebergColumnMapping(Oid relationId, Field * field,

pg_lake_table/src/fdw/partition_transform.c

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ static void *ApplyDayTransformToColumn(IcebergPartitionTransform * transform,
5959
static void *ApplyHourTransformToColumn(IcebergPartitionTransform * transform,
6060
Datum columnValue, bool isNull,
6161
size_t *valueSize);
62-
static IcebergPartitionTransform * GetPartitionTransformFromSpecField(Oid relationId,
62+
static IcebergPartitionTransform * GetPartitionTransformFromSpecField(Oid relationId, AttrNumber attNum,
6363
IcebergPartitionSpecField * specField);
6464
static void ParseTransformName(const char *name, IcebergPartitionTransformType * type,
6565
size_t *bucketCount, size_t *truncateLen);
@@ -196,7 +196,19 @@ GetPartitionTransformsFromSpecFields(Oid relationId, List *specFields)
196196
{
197197
IcebergPartitionSpecField *specField = lfirst(lc);
198198

199-
IcebergPartitionTransform *transform = GetPartitionTransformFromSpecField(relationId, specField);
199+
bool inCurrentSchema = false;
200+
201+
AttrNumber attNum =
202+
GetAttributeForFieldId(relationId, specField->source_id, &inCurrentSchema);
203+
204+
/*
205+
* skip building transform if there is no corresponding attribute in
206+
* the current postgres schema, which means no pruning is possible
207+
*/
208+
if (!inCurrentSchema)
209+
continue;
210+
211+
IcebergPartitionTransform *transform = GetPartitionTransformFromSpecField(relationId, attNum, specField);
200212

201213
Assert(transform != NULL);
202214
transformList = lappend(transformList, transform);
@@ -213,27 +225,25 @@ GetPartitionTransformsFromSpecFields(Oid relationId, List *specFields)
213225
* bucket count, and truncate length for ease of use.
214226
*/
215227
static IcebergPartitionTransform *
216-
GetPartitionTransformFromSpecField(Oid relationId, IcebergPartitionSpecField * specField)
228+
GetPartitionTransformFromSpecField(Oid relationId, AttrNumber attNum, IcebergPartitionSpecField * specField)
217229
{
218230
IcebergPartitionTransform *transform = palloc0(sizeof(IcebergPartitionTransform));
219231

220232
transform->partitionFieldId = specField->field_id;
221233
transform->partitionFieldName = pstrdup(specField->name);
222234
transform->transformName = pstrdup(specField->transform);
223-
224-
transform->attnum =
225-
GetAttributeForFieldId(relationId, specField->source_id);
235+
transform->attnum = attNum;
226236
transform->columnName = get_attname(relationId, transform->attnum, false);
227237
transform->pgType = GetAttributePGType(relationId, transform->attnum);
228238

229-
Assert(IsAnyIcebergTable(relationId));
230-
231239
if (IsAnyInternalIcebergTable(relationId))
232240
{
233241
transform->sourceField = GetRegisteredFieldForAttribute(relationId, transform->attnum);
234242
}
235243
else
236244
{
245+
Assert(IsAnyExternalIcebergTable(relationId));
246+
237247
DataFileSchema *schema = GetDataFileSchemaForTable(relationId);
238248

239249
transform->sourceField = GetDataFileSchemaFieldById(schema, specField->source_id);

pg_lake_table/src/fdw/schema_operations/field_id_mapping_catalog.c

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ static void InsertFieldMapping(Oid relationId, int attrIcebergFieldId,
5656
AttrNumber pg_attnum, PGType pgType,
5757
const char *writeDefault, const char *initialDefault,
5858
int parentFieldId);
59-
static AttrNumber GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId);
60-
static AttrNumber GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId);
59+
static AttrNumber GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId, bool *inCurrentSchema);
60+
static AttrNumber GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId, bool *inCurrentSchema);
6161

6262
#ifdef USE_ASSERT_CHECKING
6363
static List *GetAllRegisteredAttnumsForTopLevelColumns(Oid relationId);
@@ -244,30 +244,35 @@ GetRegisteredFieldForAttribute(Oid relationId, AttrNumber attrNo)
244244
}
245245

246246
/*
247-
* GetAttributeForFieldId gets the attribute number for a given field ID.
248-
*/
247+
* GetAttributeForFieldId gets the attribute number for a given field ID.
248+
* It sets inCurrentSchema if there is no attribute in current postgres schema
249+
* which corresponds to the field.
250+
*/
249251
AttrNumber
250-
GetAttributeForFieldId(Oid relationId, int fieldId)
252+
GetAttributeForFieldId(Oid relationId, int fieldId, bool *inCurrentSchema)
251253
{
252254
Assert(IsAnyIcebergTable(relationId));
253255

254256
if (IsAnyInternalIcebergTable(relationId))
255-
return GetAttributeForFieldIdForInternalIcebergTable(relationId, fieldId);
257+
{
258+
return GetAttributeForFieldIdForInternalIcebergTable(relationId, fieldId, inCurrentSchema);
259+
}
256260
else
257261
{
258262
char *currentMetadataPath = GetIcebergMetadataLocation(relationId, false);
259263

260-
return GetAttributeForFieldIdForExternalIcebergTable(currentMetadataPath, relationId, fieldId);
264+
return GetAttributeForFieldIdForExternalIcebergTable(currentMetadataPath, relationId, fieldId, inCurrentSchema);
261265
}
262266
}
263267

264268

265269
/*
266270
* GetAttributeForFieldIdForInternalIcebergTable gets the attribute number for a given field ID
267-
* for internal Iceberg tables from catalog.
271+
* for internal Iceberg tables from catalog. It also sets if the corresponding attribute is in the
272+
* current postgres schema.
268273
*/
269274
static AttrNumber
270-
GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId)
275+
GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId, bool *inCurrentSchema)
271276
{
272277
DECLARE_SPI_ARGS(2);
273278

@@ -286,17 +291,26 @@ GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId)
286291
*/
287292
bool readOnly = false;
288293

289-
SPI_EXECUTE("SELECT pg_attnum FROM " MAPPING_TABLE_NAME
294+
SPI_EXECUTE("SELECT m.pg_attnum, not attisdropped as in_current_schema "
295+
"FROM " MAPPING_TABLE_NAME " m JOIN pg_attribute p ON "
296+
" m.table_name OPERATOR(pg_catalog.=) p.attrelid "
297+
" AND m.pg_attnum OPERATOR(pg_catalog.=) p.attnum"
290298
" WHERE table_name OPERATOR(pg_catalog.=) $1"
291299
" AND field_id OPERATOR(pg_catalog.=) $2", readOnly);
292300

293301
/* there is a primary key on these filters */
294302
Assert(SPI_processed == 1);
295303

296-
bool isNull = false;
297-
AttrNumber attrNo = GET_SPI_VALUE(INT2OID, 0, 1, &isNull);
304+
bool isAttrNumNull = false;
305+
AttrNumber attrNo = GET_SPI_VALUE(INT2OID, 0, 1, &isAttrNumNull);
298306

299-
Assert(!isNull);
307+
Assert(!isAttrNumNull);
308+
309+
bool inCurrentSchemaNull = false;
310+
311+
*inCurrentSchema = GET_SPI_VALUE(BOOLOID, 0, 2, &inCurrentSchemaNull);
312+
313+
Assert(!inCurrentSchemaNull);
300314

301315
SPI_END();
302316

@@ -309,7 +323,7 @@ GetAttributeForFieldIdForInternalIcebergTable(Oid relationId, int fieldId)
309323
* for external Iceberg tables from iceberg metadata.
310324
*/
311325
static AttrNumber
312-
GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId)
326+
GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId, int fieldId, bool *inCurrentSchema)
313327
{
314328
DataFileSchema *schema = GetDataFileSchemaForExternalIcebergTable(metadataPath);
315329

@@ -336,16 +350,20 @@ GetAttributeForFieldIdForExternalIcebergTable(char *metadataPath, Oid relationId
336350

337351
SPI_EXECUTE("SELECT attnum FROM pg_attribute "
338352
"WHERE attrelid OPERATOR(pg_catalog.=) $1 "
339-
"AND attname OPERATOR(pg_catalog.=) $2 "
340-
"AND NOT attisdropped", readOnly);
353+
"AND attname OPERATOR(pg_catalog.=) $2", readOnly);
341354

342-
/* there is a primary key on these filters */
343-
Assert(SPI_processed == 1);
344-
345-
bool isNull = false;
346-
AttrNumber attrNo = GET_SPI_VALUE(INT2OID, 0, 1, &isNull);
355+
AttrNumber attrNo = 0;
347356

348-
Assert(!isNull);
357+
if (SPI_processed == 0)
358+
{
359+
*inCurrentSchema = false;
360+
attrNo = -1;
361+
}
362+
else
363+
{
364+
Assert(SPI_processed == 1);
365+
attrNo = GET_SPI_VALUE(INT2OID, 0, 1, inCurrentSchema);
366+
}
349367

350368
SPI_END();
351369

0 commit comments

Comments
 (0)