Skip to content

Commit 425186f

Browse files
Merge pull request #26794: #26789 Fix auto schema update when schema order has changed. (#26810)
Co-authored-by: reuvenlax <[email protected]>
1 parent 9d8b605 commit 425186f

File tree

5 files changed

+409
-15
lines changed

5 files changed

+409
-15
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.List;
3939
import java.util.Map;
4040
import java.util.Objects;
41+
import java.util.Optional;
4142
import java.util.Random;
4243
import java.util.Set;
4344
import java.util.concurrent.ExecutorService;
@@ -685,16 +686,22 @@ String retrieveErrorDetails(Iterable<AppendRowsContext> failedContext) {
685686

686687
void postFlush() {
687688
// If we got a response indicating an updated schema, recreate the client.
688-
if (this.appendClientInfo != null) {
689+
if (this.appendClientInfo != null && autoUpdateSchema) {
689690
@Nullable
690691
StreamAppendClient streamAppendClient = appendClientInfo.getStreamAppendClient();
691692
@Nullable
692-
TableSchema updatedTableSchema =
693+
TableSchema updatedTableSchemaReturned =
693694
(streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null;
694-
if (updatedTableSchema != null && autoUpdateSchema) {
695-
invalidateWriteStream();
696-
appendClientInfo =
697-
Preconditions.checkStateNotNull(getAppendClientInfo(false, updatedTableSchema));
695+
if (updatedTableSchemaReturned != null) {
696+
Optional<TableSchema> updatedTableSchema =
697+
TableSchemaUpdateUtils.getUpdatedSchema(
698+
this.initialTableSchema, updatedTableSchemaReturned);
699+
if (updatedTableSchema.isPresent()) {
700+
invalidateWriteStream();
701+
appendClientInfo =
702+
Preconditions.checkStateNotNull(
703+
getAppendClientInfo(false, updatedTableSchema.get()));
704+
}
698705
}
699706
}
700707
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Arrays;
3737
import java.util.List;
3838
import java.util.Map;
39+
import java.util.Optional;
3940
import java.util.Set;
4041
import java.util.concurrent.Callable;
4142
import java.util.concurrent.ExecutorService;
@@ -751,16 +752,23 @@ public void process(
751752
if (autoUpdateSchema) {
752753
@Nullable
753754
StreamAppendClient streamAppendClient = appendClientInfo.get().getStreamAppendClient();
755+
TableSchema originalSchema = appendClientInfo.get().getTableSchema();
756+
;
754757
@Nullable
755-
TableSchema newSchema =
758+
TableSchema updatedSchemaReturned =
756759
(streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null;
757760
// Update the table schema and clear the append client.
758-
if (newSchema != null) {
759-
appendClientInfo.set(
760-
AppendClientInfo.of(newSchema, appendClientInfo.get().getCloseAppendClient()));
761-
APPEND_CLIENTS.invalidate(element.getKey());
762-
APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get());
763-
updatedSchema.write(newSchema);
761+
if (updatedSchemaReturned != null) {
762+
Optional<TableSchema> newSchema =
763+
TableSchemaUpdateUtils.getUpdatedSchema(originalSchema, updatedSchemaReturned);
764+
if (newSchema.isPresent()) {
765+
appendClientInfo.set(
766+
AppendClientInfo.of(
767+
newSchema.get(), appendClientInfo.get().getCloseAppendClient()));
768+
APPEND_CLIENTS.invalidate(element.getKey());
769+
APPEND_CLIENTS.put(element.getKey(), appendClientInfo.get());
770+
updatedSchema.write(newSchema.get());
771+
}
764772
}
765773
}
766774

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.bigquery;
19+
20+
import com.google.auto.value.AutoValue;
21+
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
22+
import com.google.cloud.bigquery.storage.v1.TableSchema;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.Optional;
26+
import java.util.Set;
27+
import java.util.stream.Collectors;
28+
import javax.annotation.Nullable;
29+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
30+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
31+
32+
/** Helper utilities for handling schema-update responses. */
33+
public class TableSchemaUpdateUtils {
34+
/*
35+
Given an original schema and an updated schema, return a schema that should be used to process future records.
36+
This function returns:
37+
- If the new schema is not compatible (e.g. missing fields), then it will return Optional.empty().
38+
- If the new schema is equivalent (i.e. equal modulo field ordering) to the old schema, then it will return
39+
Optional.empty().
40+
- The returned schema will always contain the old schema as a prefix. This ensures that if any of the old
41+
fields are reordered in the new schema, we maintain the old order.
42+
*/
43+
public static Optional<TableSchema> getUpdatedSchema(
44+
TableSchema oldSchema, TableSchema newSchema) {
45+
Result updatedFields = getUpdatedSchema(oldSchema.getFieldsList(), newSchema.getFieldsList());
46+
if (updatedFields.isEquivalent()) {
47+
return Optional.empty();
48+
} else {
49+
return updatedFields
50+
.getFields()
51+
.map(
52+
tableFieldSchemas ->
53+
TableSchema.newBuilder().addAllFields(tableFieldSchemas).build());
54+
}
55+
}
56+
57+
@AutoValue
58+
abstract static class Result {
59+
abstract Optional<List<TableFieldSchema>> getFields();
60+
61+
abstract boolean isEquivalent();
62+
63+
static Result of(List<TableFieldSchema> fields, boolean isEquivalent) {
64+
return new AutoValue_TableSchemaUpdateUtils_Result(Optional.of(fields), isEquivalent);
65+
}
66+
67+
static Result empty() {
68+
return new AutoValue_TableSchemaUpdateUtils_Result(Optional.empty(), false);
69+
}
70+
}
71+
72+
private static Result getUpdatedSchema(
73+
@Nullable List<TableFieldSchema> oldSchema, @Nullable List<TableFieldSchema> newSchema) {
74+
if (newSchema == null) {
75+
return Result.empty();
76+
}
77+
if (oldSchema == null) {
78+
return Result.of(newSchema, false);
79+
}
80+
81+
Map<String, TableFieldSchema> newSchemaMap =
82+
newSchema.stream().collect(Collectors.toMap(TableFieldSchema::getName, x -> x));
83+
Set<String> fieldNamesPopulated = Sets.newHashSet();
84+
List<TableFieldSchema> updatedSchema = Lists.newArrayList();
85+
boolean isEquivalent = oldSchema.size() == newSchema.size();
86+
for (TableFieldSchema tableFieldSchema : oldSchema) {
87+
@Nullable TableFieldSchema newTableFieldSchema = newSchemaMap.get(tableFieldSchema.getName());
88+
if (newTableFieldSchema == null) {
89+
// We don't support deleting fields!
90+
return Result.empty();
91+
}
92+
TableFieldSchema.Builder updatedTableFieldSchema = newTableFieldSchema.toBuilder();
93+
updatedTableFieldSchema.clearFields();
94+
if (tableFieldSchema.getType().equals(TableFieldSchema.Type.STRUCT)) {
95+
Result updatedTableFields =
96+
getUpdatedSchema(tableFieldSchema.getFieldsList(), newTableFieldSchema.getFieldsList());
97+
if (!updatedTableFields.getFields().isPresent()) {
98+
return updatedTableFields;
99+
}
100+
updatedTableFieldSchema.addAllFields(updatedTableFields.getFields().get());
101+
isEquivalent = isEquivalent && updatedTableFields.isEquivalent();
102+
isEquivalent =
103+
isEquivalent
104+
&& tableFieldSchema
105+
.toBuilder()
106+
.clearFields()
107+
.build()
108+
.equals(newTableFieldSchema.toBuilder().clearFields().build());
109+
} else {
110+
isEquivalent = isEquivalent && tableFieldSchema.equals(newTableFieldSchema);
111+
}
112+
updatedSchema.add(updatedTableFieldSchema.build());
113+
fieldNamesPopulated.add(updatedTableFieldSchema.getName());
114+
}
115+
116+
// Add in new fields at the end of the schema.
117+
newSchema.stream()
118+
.filter(f -> !fieldNamesPopulated.contains(f.getName()))
119+
.forEach(updatedSchema::add);
120+
return Result.of(updatedSchema, isEquivalent);
121+
}
122+
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1985,9 +1985,13 @@ public void updateTableSchemaTest(boolean useSet) throws Exception {
19851985
new TableSchema()
19861986
.setFields(
19871987
ImmutableList.of(
1988-
new TableFieldSchema().setName("name").setType("STRING"),
19891988
new TableFieldSchema().setName("number").setType("INTEGER"),
1989+
new TableFieldSchema().setName("name").setType("STRING"),
19901990
new TableFieldSchema().setName("req").setType("STRING").setMode("REQUIRED")));
1991+
1992+
// Add new fields to the update schema. Also reorder some existing fields to validate that we
1993+
// handle update
1994+
// field reordering correctly.
19911995
TableSchema tableSchemaUpdated =
19921996
new TableSchema()
19931997
.setFields(
@@ -2018,8 +2022,8 @@ public void updateTableSchemaTest(boolean useSet) throws Exception {
20182022
new TableRow()
20192023
.setF(
20202024
ImmutableList.of(
2021-
new TableCell().setV("name" + i),
20222025
new TableCell().setV(Long.toString(i)),
2026+
new TableCell().setV("name" + i),
20232027
new TableCell().setV(i > 5 ? null : "foo"),
20242028
new TableCell().setV(Long.toString(i * 2))));
20252029

0 commit comments

Comments
 (0)