Skip to content

Commit 2c7287f

Browse files
Do not generate a new schema if already exists (#89)
1 parent bfa38ce commit 2c7287f

File tree

9 files changed

+336
-47
lines changed

9 files changed

+336
-47
lines changed

pg_lake_engine/include/pg_lake/data_file/data_files.h

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,24 @@ typedef enum TableMetadataOperationType
102102
struct IcebergPartitionSpec;
103103
struct Partition;
104104

105+
106+
/*
107+
* When a schema is set back to an existing schema in iceberg metadata,
108+
* we use DDL_EFFECT_SET_EXISTING_SCHEMA to indicate which effect the
109+
* DDL has. An example would be ALTER TABLE .. ADD COLUMN x; followed by
110+
* ALTER TABLE .. DROP COLUMN x; which results in the same schema as before.
111+
* In all other cases, we use DDL_EFFECT_ADD_SCHEMA as that'd mean there is
112+
* no existing schema to set to, and we should add a new schema.
113+
* ps: This is especially important for REST catalog/Polaris, where it rejects
114+
* identical schema addition.
115+
*/
116+
typedef enum DDLSchemaEffect
117+
{
118+
DDL_EFFECT_NONE = 0,
119+
DDL_EFFECT_ADD_SCHEMA = 1,
120+
DDL_EFFECT_SET_EXISTING_SCHEMA = 2,
121+
} DDLSchemaEffect;
122+
105123
/*
106124
* TableMetadataOperation represents an operation on table metadata.
107125
*/
@@ -125,8 +143,13 @@ typedef struct TableMetadataOperation
125143
/* for a new deletion file, from which data file are we deleting? */
126144
char *deletedFrom;
127145

128-
/* relevant to TABLE_DDL event, up-to-date schema */
129-
DataFileSchema *schema;
146+
/*
147+
* newSchema and existingSchemaId are mutually exclusive, set according to
148+
* ddlSchemaEffect.
149+
*/
150+
DDLSchemaEffect ddlSchemaEffect;
151+
DataFileSchema *newSchema;
152+
int32_t existingSchemaId;
130153

131154
/* for multi-delete, which files we are deleting from */
132155
List *deleteStats;

pg_lake_engine/include/pg_lake/parquet/leaf_field.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ typedef struct LeafField
5050
} LeafField;
5151

5252
extern PGDLLEXPORT int LeafFieldCompare(const ListCell *a, const ListCell *b);
53+
extern PGDLLEXPORT bool SchemaFieldsEquivalent(DataFileSchemaField * fieldA, DataFileSchemaField * fieldB);
5354
#if PG_VERSION_NUM < 170000
5455
extern PGDLLEXPORT int pg_cmp_s32(int32 a, int32 b);
5556
#endif

pg_lake_engine/include/pg_lake/util/string_utils.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ extern PGDLLEXPORT char *EscapedStringLiteral(const char *val);
3333
extern PGDLLEXPORT char *ReverseStringSearch(const char *haystack, const char *needle);
3434
extern PGDLLEXPORT int32_t AdjustAnyCharTypmod(int32_t typmod, int32_t newLength);
3535
extern PGDLLEXPORT int32_t GetAnyCharLengthFrom(int32_t typmod);
36-
36+
extern PGDLLEXPORT bool PgStrcasecmpNullable(const char *a, const char *b);
3737

3838
#define RangeVarQuoteIdentifier(rv) \
3939
(((rv)->schemaname != NULL) ? \

pg_lake_engine/src/parquet/field.c

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "pg_lake/parquet/field.h"
2323
#include "pg_lake/parquet/leaf_field.h"
24+
#include "pg_lake/util/string_utils.h"
2425

2526
static FieldStructElement * DeepCopyFieldStructElement(FieldStructElement * structElementField);
2627
static Field * DeepCopyField(const Field * field);
@@ -145,3 +146,40 @@ pg_cmp_s32(int32 a, int32 b)
145146
return (a > b) - (a < b);
146147
}
147148
#endif
149+
150+
151+
152+
/*
153+
* SchemaFieldsEquivalent compares two DataFileSchemaField structs for equivalence.
154+
* It returns true if they are equivalent, false otherwise.
155+
* Note that we do not compare the field->type here, as we do not allow changing
156+
* the type of any field in the schema, including nested types.
157+
*/
158+
bool
159+
SchemaFieldsEquivalent(DataFileSchemaField * fieldA, DataFileSchemaField * fieldB)
160+
{
161+
if (fieldA->id != fieldB->id)
162+
return false;
163+
164+
if (!PgStrcasecmpNullable(fieldA->name, fieldB->name))
165+
return false;
166+
167+
if (fieldA->required != fieldB->required)
168+
return false;
169+
170+
if (!PgStrcasecmpNullable(fieldA->doc, fieldB->doc))
171+
return false;
172+
173+
if (!PgStrcasecmpNullable(fieldA->writeDefault, fieldB->writeDefault))
174+
return false;
175+
176+
if (!PgStrcasecmpNullable(fieldA->initialDefault, fieldB->initialDefault))
177+
return false;
178+
179+
/*
180+
* We don't allow changing any of the types of the fields in the schema,
181+
* including the fields of nested types. So we don't need to compare
182+
* anything about the field->type here.
183+
*/
184+
return true;
185+
}

pg_lake_engine/src/utils/string_utils.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,3 +245,19 @@ GetAnyCharLengthFrom(int32_t typmod)
245245

246246
return typmod - VARHDRSZ;
247247
}
248+
249+
250+
/*
251+
* PgStrcasecmpNullable compares two strings for equality, treating NULLs as equal.
252+
*/
253+
bool
254+
PgStrcasecmpNullable(const char *a, const char *b)
255+
{
256+
if (a == NULL && b == NULL)
257+
return true;
258+
259+
if (a == NULL || b == NULL)
260+
return false;
261+
262+
return pg_strcasecmp(a, b) == 0;
263+
}

pg_lake_iceberg/src/iceberg/api/table_metadata.c

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ static void SetSnapshotReference(IcebergTableMetadata * metadata, uint64_t snaps
7676
static void GroupExpiredSnapshots(IcebergTableMetadata * metadata, IcebergSnapshot * expiredSnapshots, int *expiredSnapshotCount,
7777
IcebergSnapshot * nonExpiredSnapshots, int *nonExpiredSnapshotCount);
7878
static bool SnapshotAgeExceeded(IcebergSnapshot * snapshot, int64_t currentTimeMs, int maxAge);
79+
static int32_t MaxSchemaId(IcebergTableSchema * schemas, size_t schemasLength);
7980

8081
/*
8182
* GenerateEmptyTableMetadata generates an empty iceberg table metadata
@@ -544,7 +545,6 @@ void
544545
AppendCurrentPostgresSchema(Oid relationId, IcebergTableMetadata * metadata,
545546
DataFileSchema * schema)
546547
{
547-
int currentSchemaId = metadata->current_schema_id;
548548
int currentSchemaLength = metadata->schemas_length;
549549

550550
IcebergTableSchema *newSchema = RebuildIcebergSchemaFromDataFileSchema(relationId, schema, &metadata->last_column_id);
@@ -560,13 +560,33 @@ AppendCurrentPostgresSchema(Oid relationId, IcebergTableMetadata * metadata,
560560
}
561561

562562
/* first schema should always start with id=0 */
563-
newSchema->schema_id = (currentSchemaLength == 0) ? 0 : currentSchemaId + 1;
563+
newSchema->schema_id = (currentSchemaLength == 0) ? 0 : MaxSchemaId(metadata->schemas, currentSchemaLength) + 1;
564564
metadata->schemas[currentSchemaLength] = *newSchema;
565565
metadata->schemas_length = currentSchemaLength + 1;
566566
metadata->current_schema_id = newSchema->schema_id;
567567
}
568568

569569

570+
/*
571+
* MaxSchemaId finds the maximum schema ID from the given schemas.
572+
*/
573+
static int32_t
574+
MaxSchemaId(IcebergTableSchema * schemas, size_t schemasLength)
575+
{
576+
int32_t maxSchemaId = -1;
577+
578+
for (size_t i = 0; i < schemasLength; i++)
579+
{
580+
if (schemas[i].schema_id > maxSchemaId)
581+
{
582+
maxSchemaId = schemas[i].schema_id;
583+
}
584+
}
585+
586+
Assert(maxSchemaId != -1);
587+
return maxSchemaId;
588+
}
589+
570590
/*
571591
* AppendPartitionSpec appends given partition spec to the metadata.
572592
*/

pg_lake_iceberg/src/iceberg/metadata_operations.c

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ typedef struct IcebergSnapshotBuilder
8282
/* whether to apply manifest compaction */
8383
bool applyManifestCompaction;
8484

85-
/* a DDL has changed the iceberg schema */
85+
/* a DDL has changed the iceberg schema or set to an existing schema */
8686
bool regenerateSchema;
8787

8888
/* a DDL has changed partition specs */
@@ -96,8 +96,10 @@ typedef struct IcebergSnapshotBuilder
9696
/* whether to expire old snapshots */
9797
bool expireOldSnapshots;
9898

99-
/* up-to-date schema for table */
99+
/* new schema */
100100
DataFileSchema *schema;
101+
/* a DDL has set to an existing schema */
102+
int32_t schemaId;
101103

102104
/* snapshot operation */
103105
SnapshotOperation operation;
@@ -183,7 +185,10 @@ ApplyIcebergMetadataChanges(Oid relationId, List *metadataOperations, List *allT
183185

184186
if (builder->createTable || builder->regenerateSchema)
185187
{
186-
AppendCurrentPostgresSchema(relationId, metadata, builder->schema);
188+
if (builder->schema != NULL)
189+
AppendCurrentPostgresSchema(relationId, metadata, builder->schema);
190+
else
191+
metadata->current_schema_id = builder->schemaId;
187192
}
188193

189194
if (builder->createTable || builder->regeneratePartitionSpec)
@@ -588,7 +593,7 @@ ProcessIcebergMetadataOperations(Oid relationId, List *metadataOperations,
588593
Assert(!builder->createTable);
589594

590595
builder->createTable = true;
591-
builder->schema = operation->schema;
596+
builder->schema = operation->newSchema;
592597
builder->partitionSpecs = operation->partitionSpecs;
593598
builder->defaultSpecId = operation->defaultSpecId;
594599

@@ -604,14 +609,33 @@ ProcessIcebergMetadataOperations(Oid relationId, List *metadataOperations,
604609
*/
605610
Assert(!builder->regenerateSchema);
606611

612+
builder->regenerateSchema = true;
613+
607614
/*
608615
* We are requested to update the table schema, and we'll
609616
* handle this operation in
610617
* TrackIcebergMetadataChangesInTx().
611618
*/
612619

613-
builder->regenerateSchema = true;
614-
builder->schema = operation->schema;
620+
if (operation->ddlSchemaEffect == DDL_EFFECT_ADD_SCHEMA)
621+
{
622+
/* these two are mutually exclusive */
623+
Assert(operation->existingSchemaId == -1);
624+
Assert(operation->newSchema != NULL);
625+
626+
builder->schema = operation->newSchema;
627+
}
628+
else if (operation->ddlSchemaEffect == DDL_EFFECT_SET_EXISTING_SCHEMA)
629+
{
630+
/* these two are mutually exclusive */
631+
Assert(operation->existingSchemaId != -1);
632+
Assert(operation->newSchema == NULL);
633+
builder->schemaId = operation->existingSchemaId;
634+
}
635+
else
636+
{
637+
ereport(ERROR, (errmsg("Unsupported DDL schema effect: %d", operation->ddlSchemaEffect)));
638+
}
615639

616640
break;
617641
}

pg_lake_table/src/transaction/track_iceberg_metadata_changes.c

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ static List *GetDataFileMetadataOperations(const TableMetadataOperationTracker *
5050
List *allTransforms);
5151
static List *GetDDLMetadataOperations(const TableMetadataOperationTracker * opTracker);
5252
static void DeleteInProgressAddedFiles(Oid relationId, List *addedFiles);
53+
static bool AreSchemasEqual(IcebergTableSchema * existingSchema, DataFileSchema * newSchema);
54+
static int32_t GetSchemaIdForIcebergTableIfExists(const TableMetadataOperationTracker * opTracker, DataFileSchema * schema);
5355
static int ComparePartitionSpecsById(const ListCell *a, const ListCell *b);
5456

5557

@@ -596,7 +598,7 @@ GetDDLMetadataOperations(const TableMetadataOperationTracker * opTracker)
596598
TableMetadataOperation *createOp = palloc0(sizeof(TableMetadataOperation));
597599

598600
createOp->type = TABLE_CREATE;
599-
createOp->schema = schema;
601+
createOp->newSchema = schema;
600602
createOp->partitionSpecs = newPartitionSpecs;
601603
createOp->defaultSpecId = defaultSpecId;
602604

@@ -611,10 +613,30 @@ GetDDLMetadataOperations(const TableMetadataOperationTracker * opTracker)
611613

612614
if (opTracker->relationAltered)
613615
{
616+
/*
617+
* When a table is altered, its schema has changed. However, it might
618+
* have been set to an existing schema in the iceberg metadata, if so,
619+
* use that schemaId.
620+
*/
614621
TableMetadataOperation *ddlOp = palloc0(sizeof(TableMetadataOperation));
615622

616623
ddlOp->type = TABLE_DDL;
617-
ddlOp->schema = schema;
624+
625+
int32_t existingSchemaId =
626+
GetSchemaIdForIcebergTableIfExists(opTracker, schema);
627+
628+
if (existingSchemaId != -1)
629+
{
630+
ddlOp->ddlSchemaEffect = DDL_EFFECT_SET_EXISTING_SCHEMA;
631+
ddlOp->existingSchemaId = existingSchemaId;
632+
ddlOp->newSchema = NULL;
633+
}
634+
else
635+
{
636+
ddlOp->ddlSchemaEffect = DDL_EFFECT_ADD_SCHEMA;
637+
ddlOp->newSchema = schema;
638+
ddlOp->existingSchemaId = -1;
639+
}
618640

619641
operations = lappend(operations, ddlOp);
620642
}
@@ -645,3 +667,52 @@ ComparePartitionSpecsById(const ListCell *a, const ListCell *b)
645667

646668
return pg_cmp_s32(specA->spec_id, specB->spec_id);
647669
}
670+
671+
672+
/*
673+
* GetSchemaIdForIcebergTableIfExists checks if the given schema already exists
674+
* in the iceberg table metadata. If it exists, it returns the schema ID, otherwise -1.
675+
*/
676+
static int32_t
677+
GetSchemaIdForIcebergTableIfExists(const TableMetadataOperationTracker * opTracker, DataFileSchema * schema)
678+
{
679+
IcebergTableMetadata *metadata = GetLastPushedIcebergMetadata(opTracker);
680+
681+
if (metadata == NULL)
682+
return -1;
683+
684+
IcebergTableSchema *schemas = metadata->schemas;
685+
686+
for (int schemaIndex = 0; schemaIndex < metadata->schemas_length; schemaIndex++)
687+
{
688+
IcebergTableSchema *existingSchema = &schemas[schemaIndex];
689+
690+
if (AreSchemasEqual(existingSchema, schema))
691+
return schemas[schemaIndex].schema_id;
692+
693+
}
694+
695+
return -1;
696+
}
697+
698+
699+
/*
700+
* AreSchemasEqual compares two schemas for equality.
701+
*/
702+
static bool
703+
AreSchemasEqual(IcebergTableSchema * existingSchema, DataFileSchema * newSchema)
704+
{
705+
if (existingSchema->fields_length != newSchema->nfields)
706+
return false;
707+
708+
for (size_t i = 0; i < existingSchema->fields_length; i++)
709+
{
710+
DataFileSchemaField *existingField = &existingSchema->fields[i];
711+
DataFileSchemaField *newField = &newSchema->fields[i];
712+
713+
if (!SchemaFieldsEquivalent(existingField, newField))
714+
return false;
715+
}
716+
717+
return true;
718+
}

0 commit comments

Comments
 (0)