From d23c095d61bb8fb065e0f6071eb95bfe7733546f Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 30 Nov 2025 18:33:57 -0500 Subject: [PATCH 1/7] initial --- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 63 ++- .../bigquery/TableRowToStorageApiProto.java | 101 +++- .../bigquery/BigQueryTimestampPicosIT.java | 430 ++++++++++++++++++ .../io/gcp/bigquery/BigQueryUtilsTest.java | 94 ++++ .../TableRowToStorageApiProtoTest.java | 142 +++++- 5 files changed, 820 insertions(+), 10 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index f9a59ba089c4..f751875b5cb0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -101,7 +101,7 @@ public class BigQueryUtils { private static final Pattern TABLE_RESOURCE_PATTERN = Pattern.compile( "^projects/(?[^/]+)/datasets/(?[^/]+)/tables/(?[^/]+)$"); - +ig // For parsing the format used to refer to tables parameters in BigQueryIO. // "{project_id}:{dataset_id}.{table_id}" or // "{project_id}.{dataset_id}.{table_id}" @@ -380,6 +380,67 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) { return ret; } + /** + * Represents a timestamp with picosecond precision, split into seconds and picoseconds + * components. + */ + public static class TimestampPicos { + final long seconds; + final long picoseconds; + + TimestampPicos(long seconds, long picoseconds) { + this.seconds = seconds; + this.picoseconds = picoseconds; + } + } + + /** + * Parses a timestamp string into seconds and picoseconds components. + * + *

Handles two formats: + * + *

    + *
  • ISO format with exactly 12 fractional digits ending in Z (picosecond precision): e.g., + * "2024-01-15T10:30:45.123456789012Z" + *
  • UTC format with 0-9 fractional digits ending in "UTC" (up to nanosecond precision): e.g., + * "2024-01-15 10:30:45.123456789 UTC", "2024-01-15 10:30:45 UTC" + *
+ */ + public static TimestampPicos parseTimestampPicosFromString(String timestampString) { + // Check for ISO picosecond format up to 12 fractional digits before Z + // Format: "2024-01-15T10:30:45.123456789012Z" + if (timestampString.endsWith("Z")) { + int dotIndex = timestampString.lastIndexOf('.'); + + if (dotIndex > 0) { + String fractionalPart = + timestampString.substring(dotIndex + 1, timestampString.length() - 1); + + if (fractionalPart.length() == 12) { + // ISO timestamp with 12 decimal digits (picosecond precision) + // Parse the datetime part (without fractional seconds) + String dateTimePart = timestampString.substring(0, dotIndex) + "Z"; + java.time.Instant baseInstant = java.time.Instant.parse(dateTimePart); + + // Parse all 12 digits directly as picoseconds (subsecond portion) + long picoseconds = Long.parseLong(fractionalPart); + + return new TimestampPicos(baseInstant.getEpochSecond(), picoseconds); + } + } + + // ISO format with 0-9 fractional digits - Instant.parse handles this + java.time.Instant timestamp = java.time.Instant.parse(timestampString); + return new TimestampPicos(timestamp.getEpochSecond(), timestamp.getNano() * 1000L); + } + + // UTC format: "2024-01-15 10:30:45.123456789 UTC" + // Use TIMESTAMP_FORMATTER which handles space separator and "UTC" suffix + java.time.Instant timestamp = + java.time.Instant.from(TIMESTAMP_FORMATTER.parse(timestampString)); + return new TimestampPicos(timestamp.getEpochSecond(), timestamp.getNano() * 1000L); + } + /** * Get the Beam {@link FieldType} from a BigQuery type name. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index c5451b04a4b2..57dfb8f0f61b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -191,6 +191,23 @@ private static String getPrettyFieldName(String fullName) { .put(TableFieldSchema.Type.JSON, "JSON") .build(); + static final DescriptorProto TIMESTAMP_PICOS_DESCRIPTOR_PROTO = + DescriptorProto.newBuilder() + .setName("TimestampPicos") + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("seconds") + .setNumber(1) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64) + .build()) + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("picoseconds") + .setNumber(2) + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64) + .build()) + .build(); + @FunctionalInterface public interface ThrowingBiFunction { OutputT apply(FirstInputT t, SecondInputT u) throws SchemaConversionException; @@ -533,6 +550,9 @@ public static TableFieldSchema tableFieldToProtoTableField( if (field.getScale() != null) { builder.setScale(field.getScale()); } + if (field.getTimestampPrecision() != null) { + builder.getTimestampPrecisionBuilder().setValue(field.getTimestampPrecision()); + } builder.setType(typeToProtoType(field.getType())); if (builder.getType().equals(TableFieldSchema.Type.STRUCT)) { for (com.google.api.services.bigquery.model.TableFieldSchema subField : field.getFields()) { @@ -587,6 +607,10 @@ public boolean isRepeated() { return tableFieldSchema.getMode().equals(TableFieldSchema.Mode.REPEATED); } + public long getTimestampPrecision() { + return tableFieldSchema.getTimestampPrecision().getValue(); + } + public SchemaInformation getSchemaForField(String name) { SchemaInformation schemaInformation = subFieldsByName.get(name.toLowerCase()); if (schemaInformation == null) { @@ -631,7 +655,6 @@ static SchemaInformation fromTableSchema( .put(TableFieldSchema.Type.DATE, Type.TYPE_INT32) .put(TableFieldSchema.Type.TIME, Type.TYPE_INT64) .put(TableFieldSchema.Type.DATETIME, Type.TYPE_INT64) - .put(TableFieldSchema.Type.TIMESTAMP, Type.TYPE_INT64) .put(TableFieldSchema.Type.JSON, Type.TYPE_STRING) .build(); @@ -957,10 +980,16 @@ static TableFieldSchema tableFieldSchemaFromDescriptorField(FieldDescriptor fiel switch (fieldDescriptor.getType()) { case MESSAGE: - tableFieldSchemaBuilder = tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT); - TableSchema nestedTableField = tableSchemaFromDescriptor(fieldDescriptor.getMessageType()); - tableFieldSchemaBuilder = - tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList()); + if (fieldDescriptor.getMessageType().getName().equals("TimestampPicos")) { + tableFieldSchemaBuilder.setType(TableFieldSchema.Type.TIMESTAMP); + tableFieldSchemaBuilder.setPrecision(12); + } else { + tableFieldSchemaBuilder = tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT); + TableSchema nestedTableField = + tableSchemaFromDescriptor(fieldDescriptor.getMessageType()); + tableFieldSchemaBuilder = + tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList()); + } break; default: TableFieldSchema.Type type = PRIMITIVE_TYPES_PROTO_TO_BQ.get(fieldDescriptor.getType()); @@ -1060,6 +1089,25 @@ private static void fieldDescriptorFromTableField( fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName()); break; + case TIMESTAMP: + if (fieldSchema.getTimestampPrecision().getValue() == 12) { + boolean typeAlreadyExists = + descriptorBuilder.getNestedTypeList().stream() + .anyMatch(d -> TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName().equals(d.getName())); + + if (!typeAlreadyExists) { + descriptorBuilder.addNestedType(TIMESTAMP_PICOS_DESCRIPTOR_PROTO); + } + fieldDescriptorBuilder = + fieldDescriptorBuilder + .setType(Type.TYPE_MESSAGE) + .setTypeName(TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName()); + } else { + // Microsecond precision - use simple INT64 + fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64); + } + break; + default: @Nullable Type type = PRIMITIVE_TYPES_BQ_TO_PROTO.get(fieldSchema.getType()); if (type == null) { @@ -1313,6 +1361,34 @@ public static ByteString mergeNewFields( null, null); } + } else if (schemaInformation.getType() == TableFieldSchema.Type.TIMESTAMP + && schemaInformation.getTimestampPrecision() == 12) { + + long seconds; + long picoseconds; + + if (value instanceof String) { + BigQueryUtils.TimestampPicos parsed = + BigQueryUtils.parseTimestampPicosFromString((String) value); + seconds = parsed.seconds; + picoseconds = parsed.picoseconds; + + } else if (value instanceof Instant) { + Instant timestamp = (Instant) value; + seconds = timestamp.getEpochSecond(); + picoseconds = timestamp.getNano() * 1000L; + } else { + throw new IllegalArgumentException( + "Unsupported timestamp value type: " + value.getClass().getName()); + } + + converted = + DynamicMessage.newBuilder(fieldDescriptor.getMessageType()) + .setField(fieldDescriptor.getMessageType().findFieldByName("seconds"), seconds) + .setField( + fieldDescriptor.getMessageType().findFieldByName("picoseconds"), picoseconds) + .build(); + } else { @Nullable ThrowingBiFunction converter = @@ -1633,6 +1709,7 @@ public static Object jsonValueFromMessageValue( return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER); } else if (fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) { Message message = (Message) fieldValue; + String messageName = fieldDescriptor.getMessageType().getName(); if (TIMESTAMP_VALUE_DESCRIPTOR_NAMES.contains( fieldDescriptor.getMessageType().getName())) { Descriptor descriptor = message.getDescriptorForType(); @@ -1640,6 +1717,20 @@ public static Object jsonValueFromMessageValue( int nanos = (int) message.getField(descriptor.findFieldByName("nanos")); Instant instant = Instant.ofEpochSecond(seconds, nanos); return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER); + } else if (messageName.equals("TimestampPicos")) { + Descriptor descriptor = message.getDescriptorForType(); + long seconds = (long) message.getField(descriptor.findFieldByName("seconds")); + long picoseconds = (long) message.getField(descriptor.findFieldByName("picoseconds")); + + // Convert to ISO timestamp string with picoseconds + Instant instant = Instant.ofEpochSecond(seconds); + String baseTimestamp = instant.toString(); // "2024-01-15T10:30:45Z" + + // Format picoseconds as 12-digit string + String picosPart = String.format("%012d", picoseconds); + + // Insert before 'Z': "2024-01-15T10:30:45Z" → "2024-01-15T10:30:45.123456789012Z" + return baseTimestamp.replace("Z", "." + picosPart + "Z"); } else { throw new RuntimeException( "Not implemented yet " + fieldDescriptor.getMessageType().getFullName()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java new file mode 100644 index 000000000000..2e71c473631a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.storage.v1.DataFormat; +import java.security.SecureRandom; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Integration tests for BigQuery TIMESTAMP with various precisions. + * + *

Tests write timestamps via Storage Write API and read back using different format (AVRO/ARROW) + * and precision (PICOS/NANOS/MICROS) combinations. + * + *

Two tables are used: + * + *

    + *
  • FULL_RANGE: dates from 0001-01-01 to 9999-12-31 (for PICOS and MICROS reads) + *
  • NANOS_RANGE: dates within int64 nanos bounds ~1678 to ~2261 (for NANOS reads, which use + * Avro/Arrow int64 logical types that overflow outside this range) + *
+ */ +@RunWith(JUnit4.class) +public class BigQueryTimestampPicosIT { + + private static String project; + private static final String DATASET_ID = + "bq_timestamp_picos_it_" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32); + + private static TestBigQueryOptions bqOptions; + private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryTimestampPicosIT"); + + private static String fullRangeTableSpec; + private static String nanosRangeTableSpec; + private static final String FULL_RANGE_TABLE = "timestamp_full_range"; + private static final String NANOS_RANGE_TABLE = "timestamp_nanos_range"; + + // TEST DATA + // + // Tables have 4 timestamp columns written at different precisions: + // ts_picos - written with 12 fractional digits (picoseconds) + // ts_nanos - written with 9 fractional digits (nanoseconds) + // ts_micros - written with 6 fractional digits (microseconds) + // ts_millis - written with 3 fractional digits (milliseconds) + + /* + * FULL_RANGE table - for PICOS and MICROS reads + * Contains dates outside int64 nanos bounds (0001 and 9999) + */ + private static final List FULL_RANGE_WRITE = + ImmutableList.of( + row( + "2024-01-15T10:30:45.123456789012Z", + "2024-01-15 10:30:45.123456789 UTC", + "2024-01-15 10:30:45.123456 UTC", + "2024-01-15 10:30:45.123 UTC"), + row( + "2024-06-20T15:45:30.987654321098Z", + "2024-06-20 15:45:30.987654321 UTC", + "2024-06-20 15:45:30.987654 UTC", + "2024-06-20 15:45:30.987 UTC"), + row( + "0001-01-01T00:00:00.111222333444Z", + "0001-01-01 00:00:00.111222333 UTC", + "0001-01-01 00:00:00.111222 UTC", + "0001-01-01 00:00:00.111 UTC"), + row( + "1970-01-01T00:00:00.555666777888Z", + "1970-01-01 00:00:00.555666777 UTC", + "1970-01-01 00:00:00.555666 UTC", + "1970-01-01 00:00:00.555 UTC"), + row( + "9999-12-31T23:59:59.999888777666Z", + "9999-12-31 23:59:59.999888777 UTC", + "9999-12-31 23:59:59.999888 UTC", + "9999-12-31 23:59:59.999 UTC")); + + /* + * NANOS_RANGE table - for NANOS reads + * Only dates within int64 nanos-since-epoch bounds (~1678 to ~2261) + */ + private static final List NANOS_RANGE_WRITE = + ImmutableList.of( + row( + "2024-01-15T10:30:45.123456789012Z", + "2024-01-15 10:30:45.123456789 UTC", + "2024-01-15 10:30:45.123456 UTC", + "2024-01-15 10:30:45.123 UTC"), + row( + "2024-06-20T15:45:30.987654321098Z", + "2024-06-20 15:45:30.987654321 UTC", + "2024-06-20 15:45:30.987654 UTC", + "2024-06-20 15:45:30.987 UTC"), + row( + "1678-09-21T00:12:43.145224192555Z", + "1678-09-21 00:12:43.145224192 UTC", + "1678-09-21 00:12:43.145224 UTC", + "1678-09-21 00:12:43.145 UTC"), + row( + "1970-01-01T00:00:00.555666777888Z", + "1970-01-01 00:00:00.555666777 UTC", + "1970-01-01 00:00:00.555666 UTC", + "1970-01-01 00:00:00.555 UTC"), + row( + "2261-04-11T23:47:16.854775807333Z", + "2261-04-11 23:47:16.854775807 UTC", + "2261-04-11 23:47:16.854775 UTC", + "2261-04-11 23:47:16.854 UTC")); + + private static final List FULL_RANGE_READ_PICOS = + ImmutableList.of( + row( + "2024-01-15T10:30:45.123456789012Z", + "2024-01-15T10:30:45.123456789000Z", + "2024-01-15T10:30:45.123456000000Z", + "2024-01-15T10:30:45.123000000000Z"), + row( + "2024-06-20T15:45:30.987654321098Z", + "2024-06-20T15:45:30.987654321000Z", + "2024-06-20T15:45:30.987654000000Z", + "2024-06-20T15:45:30.987000000000Z"), + row( + "0001-01-01T00:00:00.111222333444Z", + "0001-01-01T00:00:00.111222333000Z", + "0001-01-01T00:00:00.111222000000Z", + "0001-01-01T00:00:00.111000000000Z"), + row( + "1970-01-01T00:00:00.555666777888Z", + "1970-01-01T00:00:00.555666777000Z", + "1970-01-01T00:00:00.555666000000Z", + "1970-01-01T00:00:00.555000000000Z"), + row( + "9999-12-31T23:59:59.999888777666Z", + "9999-12-31T23:59:59.999888777000Z", + "9999-12-31T23:59:59.999888000000Z", + "9999-12-31T23:59:59.999000000000Z")); + + private static final List NANOS_RANGE_READ_NANOS = + ImmutableList.of( + row( + "2024-01-15 10:30:45.123456789 UTC", + "2024-01-15 10:30:45.123456789 UTC", + "2024-01-15 10:30:45.123456 UTC", + "2024-01-15 10:30:45.123 UTC"), + row( + "2024-06-20 15:45:30.987654321 UTC", + "2024-06-20 15:45:30.987654321 UTC", + "2024-06-20 15:45:30.987654 UTC", + "2024-06-20 15:45:30.987 UTC"), + row( + "1678-09-21 00:12:43.145224192 UTC", + "1678-09-21 00:12:43.145224192 UTC", + "1678-09-21 00:12:43.145224 UTC", + "1678-09-21 00:12:43.145 UTC"), + row( + "1970-01-01 00:00:00.555666777 UTC", + "1970-01-01 00:00:00.555666777 UTC", + "1970-01-01 00:00:00.555666 UTC", + "1970-01-01 00:00:00.555 UTC"), + row( + "2261-04-11 23:47:16.854775807 UTC", + "2261-04-11 23:47:16.854775807 UTC", + "2261-04-11 23:47:16.854775 UTC", + "2261-04-11 23:47:16.854 UTC")); + + private static final List FULL_RANGE_READ_MICROS = + ImmutableList.of( + row( + "2024-01-15 10:30:45.123456 UTC", + "2024-01-15 10:30:45.123456 UTC", + "2024-01-15 10:30:45.123456 UTC", + "2024-01-15 10:30:45.123 UTC"), + row( + "2024-06-20 15:45:30.987654 UTC", + "2024-06-20 15:45:30.987654 UTC", + "2024-06-20 15:45:30.987654 UTC", + "2024-06-20 15:45:30.987 UTC"), + row( + "0001-01-01 00:00:00.111222 UTC", + "0001-01-01 00:00:00.111222 UTC", + "0001-01-01 00:00:00.111222 UTC", + "0001-01-01 00:00:00.111 UTC"), + row( + "1970-01-01 00:00:00.555666 UTC", + "1970-01-01 00:00:00.555666 UTC", + "1970-01-01 00:00:00.555666 UTC", + "1970-01-01 00:00:00.555 UTC"), + row( + "9999-12-31 23:59:59.999888 UTC", + "9999-12-31 23:59:59.999888 UTC", + "9999-12-31 23:59:59.999888 UTC", + "9999-12-31 23:59:59.999 UTC")); + + private static final List FULL_RANGE_READ_MICROS_ARROW = + ImmutableList.of( + row( + "2024-01-15 10:30:45.123 UTC", + "2024-01-15 10:30:45.123 UTC", + "2024-01-15 10:30:45.123 UTC", + "2024-01-15 10:30:45.123 UTC"), + row( + "2024-06-20 15:45:30.987 UTC", + "2024-06-20 15:45:30.987 UTC", + "2024-06-20 15:45:30.987 UTC", + "2024-06-20 15:45:30.987 UTC"), + row( + "0001-01-01 00:00:00.111 UTC", + "0001-01-01 00:00:00.111 UTC", + "0001-01-01 00:00:00.111 UTC", + "0001-01-01 00:00:00.111 UTC"), + row( + "1970-01-01 00:00:00.555 UTC", + "1970-01-01 00:00:00.555 UTC", + "1970-01-01 00:00:00.555 UTC", + "1970-01-01 00:00:00.555 UTC"), + row( + "9999-12-31 23:59:59.999 UTC", + "9999-12-31 23:59:59.999 UTC", + "9999-12-31 23:59:59.999 UTC", + "9999-12-31 23:59:59.999 UTC")); + + private static TableRow row(String picos, String nanos, String micros, String millis) { + return new TableRow() + .set("ts_picos", picos) + .set("ts_nanos", nanos) + .set("ts_micros", micros) + .set("ts_millis", millis); + } + + @BeforeClass + public static void setup() throws Exception { + bqOptions = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class); + project = bqOptions.as(GcpOptions.class).getProject(); + + BQ_CLIENT.createNewDataset(project, DATASET_ID, null, "us-central1"); + + fullRangeTableSpec = String.format("%s:%s.%s", project, DATASET_ID, FULL_RANGE_TABLE); + nanosRangeTableSpec = String.format("%s:%s.%s", project, DATASET_ID, NANOS_RANGE_TABLE); + + // Write full range table + Pipeline writePipeline1 = Pipeline.create(bqOptions); + writePipeline1 + .apply("CreateFullRange", Create.of(FULL_RANGE_WRITE)) + .apply( + "WriteFullRange", + BigQueryIO.writeTableRows() + .to(fullRangeTableSpec) + .withSchema(createSchema()) + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); + writePipeline1.run().waitUntilFinish(); + + // Write nanos range table + Pipeline writePipeline2 = Pipeline.create(bqOptions); + writePipeline2 + .apply("CreateNanosRange", Create.of(NANOS_RANGE_WRITE)) + .apply( + "WriteNanosRange", + BigQueryIO.writeTableRows() + .to(nanosRangeTableSpec) + .withSchema(createSchema()) + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); + writePipeline2.run().waitUntilFinish(); + } + + @AfterClass + public static void cleanup() { + BQ_CLIENT.deleteDataset(project, DATASET_ID); + } + + private static TableSchema createSchema() { + return new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema() + .setName("ts_picos") + .setType("TIMESTAMP") + .setTimestampPrecision(12L), + new TableFieldSchema() + .setName("ts_nanos") + .setType("TIMESTAMP") + .setTimestampPrecision(12L), + new TableFieldSchema() + .setName("ts_micros") + .setType("TIMESTAMP") + .setTimestampPrecision(12L), + new TableFieldSchema() + .setName("ts_millis") + .setType("TIMESTAMP") + .setTimestampPrecision(12L))); + } + + private void runReadTest( + TimestampPrecision precision, + DataFormat format, + String tableSpec, + String tableName, + List expectedRows) { + + Pipeline readPipeline = Pipeline.create(bqOptions); + + PCollection fromTable = + readPipeline.apply( + "ReadFromTable", + BigQueryIO.readTableRows() + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) + .withFormat(format) + .withDirectReadPicosTimestampPrecision(precision) + .from(tableSpec)); + PCollection fromTableWithSchema = + readPipeline.apply( + "ReadFromTableWithSchema", + BigQueryIO.readTableRowsWithSchema() + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) + .withFormat(format) + .withDirectReadPicosTimestampPrecision(precision) + .from(tableSpec)); + + PCollection fromQuery = + readPipeline.apply( + "ReadFromQuery", + BigQueryIO.readTableRows() + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) + .fromQuery(String.format("SELECT * FROM %s.%s.%s", project, DATASET_ID, tableName)) + .usingStandardSql() + .withFormat(format) + .withDirectReadPicosTimestampPrecision(precision)); + + PAssert.that(fromTable).containsInAnyOrder(expectedRows); + PAssert.that(fromQuery).containsInAnyOrder(expectedRows); + PAssert.that(fromTableWithSchema).containsInAnyOrder(expectedRows); + + readPipeline.run().waitUntilFinish(); + } + + @Test + public void testRead_Picos_Avro() { + runReadTest( + TimestampPrecision.PICOS, + DataFormat.AVRO, + fullRangeTableSpec, + FULL_RANGE_TABLE, + FULL_RANGE_READ_PICOS); + } + + @Test + public void testRead_Picos_Arrow() { + runReadTest( + TimestampPrecision.PICOS, + DataFormat.ARROW, + fullRangeTableSpec, + FULL_RANGE_TABLE, + FULL_RANGE_READ_PICOS); + } + + @Test + public void testRead_Nanos_Avro() { + runReadTest( + TimestampPrecision.NANOS, + DataFormat.AVRO, + nanosRangeTableSpec, + NANOS_RANGE_TABLE, + NANOS_RANGE_READ_NANOS); + } + + @Test + public void testRead_Nanos_Arrow() { + runReadTest( + TimestampPrecision.NANOS, + DataFormat.ARROW, + nanosRangeTableSpec, + NANOS_RANGE_TABLE, + NANOS_RANGE_READ_NANOS); + } + + @Test + public void testRead_Micros_Avro() { + runReadTest( + TimestampPrecision.MICROS, + DataFormat.AVRO, + fullRangeTableSpec, + FULL_RANGE_TABLE, + FULL_RANGE_READ_MICROS); + } + + @Test + public void testRead_Micros_Arrow() { + // Known issue: Arrow MICROS truncates to milliseconds + runReadTest( + TimestampPrecision.MICROS, + DataFormat.ARROW, + fullRangeTableSpec, + FULL_RANGE_TABLE, + FULL_RANGE_READ_MICROS_ARROW); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index 675885b4e942..f929c47af36e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -1464,4 +1464,98 @@ public void testToBeamRow_timestampNanos_variablePrecision() { schema, new TableRow().set("ts", "2024-08-10 16:52:07.123456789 UTC")); assertEquals(123456789, ((java.time.Instant) row9.getValue("ts")).getNano()); } + + /** Computes expected epoch seconds from an ISO-8601 timestamp. */ + private static long expectedSeconds(String isoTimestamp) { + return java.time.Instant.parse(isoTimestamp).getEpochSecond(); + } + + @Test + public void testParseTimestampPicosFromString() { + // Format: {input, isoEquivalentForSeconds, expectedPicoseconds, description} + Object[][] testCases = { + // UTC format tests (space separator, "UTC" suffix) + {"2024-01-15 10:30:45 UTC", "2024-01-15T10:30:45Z", 0L, "UTC no fractional"}, + {"2024-01-15 10:30:45.123 UTC", "2024-01-15T10:30:45Z", 123_000_000_000L, "UTC 3 digits"}, + {"2024-01-15 10:30:45.123456 UTC", "2024-01-15T10:30:45Z", 123_456_000_000L, "UTC 6 digits"}, + { + "2024-01-15 10:30:45.123456789 UTC", + "2024-01-15T10:30:45Z", + 123_456_789_000L, + "UTC 9 digits" + }, + + // ISO format tests (T separator, "Z" suffix) + {"2024-01-15T10:30:45Z", "2024-01-15T10:30:45Z", 0L, "ISO no fractional"}, + {"2024-01-15T10:30:45.123Z", "2024-01-15T10:30:45Z", 123_000_000_000L, "ISO 3 digits"}, + {"2024-01-15T10:30:45.123456Z", "2024-01-15T10:30:45Z", 123_456_000_000L, "ISO 6 digits"}, + {"2024-01-15T10:30:45.123456789Z", "2024-01-15T10:30:45Z", 123_456_789_000L, "ISO 9 digits"}, + { + "2024-01-15T10:30:45.123456789012Z", + "2024-01-15T10:30:45Z", + 123_456_789_012L, + "ISO 12 digits (picos)" + }, + + // Boundary: earliest date (0001-01-01) + {"0001-01-01 00:00:00.000000 UTC", "0001-01-01T00:00:00Z", 0L, "Earliest UTC"}, + {"0001-01-01T00:00:00Z", "0001-01-01T00:00:00Z", 0L, "Earliest ISO"}, + {"0001-01-01T00:00:00.000000000001Z", "0001-01-01T00:00:00Z", 1L, "Earliest ISO 1 pico"}, + + // Boundary: latest date (9999-12-31) + {"9999-12-31 23:59:59.999999 UTC", "9999-12-31T23:59:59Z", 999_999_000_000L, "Latest UTC"}, + { + "9999-12-31T23:59:59.999999999Z", + "9999-12-31T23:59:59Z", + 999_999_999_000L, + "Latest ISO 9 digits" + }, + { + "9999-12-31T23:59:59.999999999999Z", + "9999-12-31T23:59:59Z", + 999_999_999_999L, + "Latest ISO max picos" + }, + + // Unix epoch (1970-01-01) + {"1970-01-01 00:00:00 UTC", "1970-01-01T00:00:00Z", 0L, "Epoch UTC"}, + {"1970-01-01T00:00:00Z", "1970-01-01T00:00:00Z", 0L, "Epoch ISO"}, + {"1970-01-01T00:00:00.000000000001Z", "1970-01-01T00:00:00Z", 1L, "Epoch + 1 pico"}, + + // Fractional boundaries + {"2024-01-15T10:30:45.000000000000Z", "2024-01-15T10:30:45Z", 0L, "All zeros picos"}, + { + "2024-01-15T10:30:45.999999999999Z", + "2024-01-15T10:30:45Z", + 999_999_999_999L, + "All nines picos" + }, + { + "2024-01-15T10:30:45.1Z", + "2024-01-15T10:30:45Z", + 100_000_000_000L, + "Single digit fractional" + }, + }; + + for (Object[] testCase : testCases) { + String input = (String) testCase[0]; + String isoEquivalent = (String) testCase[1]; + long expectedPicos = (Long) testCase[2]; + String description = (String) testCase[3]; + + long expectedSecs = expectedSeconds(isoEquivalent); + + BigQueryUtils.TimestampPicos result = BigQueryUtils.parseTimestampPicosFromString(input); + + assertEquals( + String.format("Seconds mismatch for '%s' (%s)", input, description), + expectedSecs, + result.seconds); + assertEquals( + String.format("Picoseconds mismatch for '%s' (%s)", input, description), + expectedPicos, + result.picoseconds); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java index 05f0e9c993c0..ea3bb29e0815 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java @@ -42,6 +42,7 @@ import com.google.protobuf.Descriptors.DescriptorValidationException; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Int64Value; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; @@ -52,6 +53,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -131,6 +133,11 @@ public class TableRowToStorageApiProtoTest { .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluemaximum")) .add( new TableFieldSchema().setType("STRING").setName("123_illegalprotofieldname")) + .add( + new TableFieldSchema() + .setType("TIMESTAMP") + .setName("timestamppicosvalue") + .setTimestampPrecision(12L)) .build()); private static final TableSchema BASE_TABLE_SCHEMA_NO_F = @@ -183,6 +190,11 @@ public class TableRowToStorageApiProtoTest { .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampvaluemaximum")) .add( new TableFieldSchema().setType("STRING").setName("123_illegalprotofieldname")) + .add( + new TableFieldSchema() + .setType("TIMESTAMP") + .setName("timestamppicosvalue") + .setTimestampPrecision(12L)) .build()); private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO_DESCRIPTOR = @@ -396,6 +408,14 @@ public class TableRowToStorageApiProtoTest { AnnotationsProto.columnName.getDescriptor(), "123_illegalprotofieldname")) .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestamppicosvalue") + .setNumber(30) + .setType(Type.TYPE_MESSAGE) + .setLabel(Label.LABEL_OPTIONAL) + .setTypeName("TimestampPicos") + .build()) .build(); private static final com.google.cloud.bigquery.storage.v1.TableSchema BASE_TABLE_PROTO_SCHEMA = @@ -545,6 +565,12 @@ public class TableRowToStorageApiProtoTest { .setName("123_illegalprotofieldname") .setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING) .build()) + .addFields( + com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder() + .setName("timestamppicosvalue") + .setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP) + .setTimestampPrecision(Int64Value.newBuilder().setValue(12L)) + .build()) .build(); private static final DescriptorProto BASE_TABLE_SCHEMA_NO_F_PROTO = @@ -751,6 +777,14 @@ public class TableRowToStorageApiProtoTest { AnnotationsProto.columnName.getDescriptor(), "123_illegalprotofieldname")) .build()) + .addField( + FieldDescriptorProto.newBuilder() + .setName("timestamppicosvalue") + .setNumber(29) + .setType(Type.TYPE_MESSAGE) + .setLabel(Label.LABEL_OPTIONAL) + .setTypeName("TimestampPicos") + .build()) .build(); private static final com.google.cloud.bigquery.storage.v1.TableSchema @@ -896,6 +930,12 @@ public class TableRowToStorageApiProtoTest { .setName("123_illegalprotofieldname") .setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRING) .build()) + .addFields( + com.google.cloud.bigquery.storage.v1.TableFieldSchema.newBuilder() + .setName("timestamppicosvalue") + .setType(com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP) + .setTimestampPrecision(Int64Value.newBuilder().setValue(12L)) + .build()) .build(); private static final TableSchema NESTED_TABLE_SCHEMA = new TableSchema() @@ -1137,6 +1177,34 @@ public void testNestedFromTableSchema() throws Exception { assertEquals(roundTripExpectedBaseTypesNoF, nestedRoundTripTypes); } + private static final DescriptorProto TIMESTAMP_PICOS_PROTO = + DescriptorProto.newBuilder() + .setName("TimestampPicos") + .addField( + FieldDescriptorProto.newBuilder() + .setName("seconds") + .setNumber(1) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL)) + .addField( + FieldDescriptorProto.newBuilder() + .setName("picoseconds") + .setNumber(2) + .setType(Type.TYPE_INT64) + .setLabel(Label.LABEL_OPTIONAL)) + .build(); + + private static final Descriptor TIMESTAMP_PICOS_DESCRIPTOR; + + static { + try { + TIMESTAMP_PICOS_DESCRIPTOR = + TableRowToStorageApiProto.wrapDescriptorProto(TIMESTAMP_PICOS_PROTO); + } catch (DescriptorValidationException e) { + throw new RuntimeException(e); + } + } + private static final List REPEATED_BYTES = ImmutableList.of( BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)), @@ -1183,7 +1251,8 @@ public void testNestedFromTableSchema() throws Exception { new TableCell().setV("1970-01-01 00:00:00.1230"), new TableCell().setV("2019-08-16 00:52:07.123456"), new TableCell().setV("9999-12-31 23:59:59.999999Z"), - new TableCell().setV("madeit"))); + new TableCell().setV("madeit"), + new TableCell().setV("2024-01-15T10:30:45.123456789012Z"))); private static final TableRow BASE_TABLE_ROW_NO_F = new TableRow() @@ -1217,7 +1286,8 @@ public void testNestedFromTableSchema() throws Exception { .set("timestampvaluespacetrailingzero", "1970-01-01 00:00:00.1230") .set("datetimevaluespace", "2019-08-16 00:52:07.123456") .set("timestampvaluemaximum", "9999-12-31 23:59:59.999999Z") - .set("123_illegalprotofieldname", "madeit"); + .set("123_illegalprotofieldname", "madeit") + .set("timestamppicosvalue", "2024-01-15T10:30:45.123456789012Z"); private static final Map BASE_ROW_EXPECTED_PROTO_VALUES = ImmutableMap.builder() @@ -1261,6 +1331,15 @@ public void testNestedFromTableSchema() throws Exception { .put( BigQuerySchemaUtil.generatePlaceholderFieldName("123_illegalprotofieldname"), "madeit") + .put( + "timestamppicosvalue", + DynamicMessage.newBuilder(TIMESTAMP_PICOS_DESCRIPTOR) + .setField( + TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("seconds"), + Instant.parse("2024-01-15T10:30:45Z").getEpochSecond()) + .setField( + TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("picoseconds"), 123456789012L) + .build()) .build(); private static final Map BASE_ROW_EXPECTED_NAME_OVERRIDES = @@ -1309,6 +1388,15 @@ public void testNestedFromTableSchema() throws Exception { .put( BigQuerySchemaUtil.generatePlaceholderFieldName("123_illegalprotofieldname"), "madeit") + .put( + "timestamppicosvalue", + DynamicMessage.newBuilder(TIMESTAMP_PICOS_DESCRIPTOR) + .setField( + TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("seconds"), + Instant.parse("2024-01-15T10:30:45Z").getEpochSecond()) + .setField( + TIMESTAMP_PICOS_DESCRIPTOR.findFieldByName("picoseconds"), 123456789012L) + .build()) .build(); private static final Map BASE_ROW_NO_F_EXPECTED_NAME_OVERRIDES = @@ -1394,6 +1482,16 @@ private TableRow normalizeTableRowF( == com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.STRUCT) { return normalizeTableRow((TableRow) value, schemaInformation, outputUsingF); } else { + if (schemaInformation.getType() + == com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP) { + // Handle picosecond timestamp (12-digit precision) + if (schemaInformation.getTimestampPrecision() == 12) { + // Already a string, return as-is. + if (value instanceof String) { + return value; + } + } + } convertedValue = TYPE_MAP_PROTO_CONVERTERS.get(schemaInformation.getType()).apply("", value); switch (schemaInformation.getType()) { case BOOL: @@ -1461,8 +1559,42 @@ private void assertBaseRecord(DynamicMessage msg, boolean withF) { entry -> entry.getKey().getOptions().getExtension(AnnotationsProto.columnName))); - assertEquals( - withF ? BASE_ROW_EXPECTED_PROTO_VALUES : BASE_ROW_NO_F_EXPECTED_PROTO_VALUES, recordFields); + // Get expected values + Map expectedValues = + withF ? BASE_ROW_EXPECTED_PROTO_VALUES : BASE_ROW_NO_F_EXPECTED_PROTO_VALUES; + + // Handle timestamppicosvalue separately since DynamicMessage doesn't have proper equals() + Object actualPicos = recordFields.get("timestamppicosvalue"); + Object expectedPicos = expectedValues.get("timestamppicosvalue"); + + if (actualPicos != null && expectedPicos != null) { + // Compare DynamicMessages by their field values + DynamicMessage actualPicosMsg = (DynamicMessage) actualPicos; + DynamicMessage expectedPicosMsg = (DynamicMessage) expectedPicos; + + Descriptor actualDescriptor = actualPicosMsg.getDescriptorForType(); + + assertEquals( + "TimestampPicos seconds mismatch", + expectedPicosMsg.getField( + expectedPicosMsg.getDescriptorForType().findFieldByName("seconds")), + actualPicosMsg.getField(actualDescriptor.findFieldByName("seconds"))); + assertEquals( + "TimestampPicos picoseconds mismatch", + expectedPicosMsg.getField( + expectedPicosMsg.getDescriptorForType().findFieldByName("picoseconds")), + actualPicosMsg.getField(actualDescriptor.findFieldByName("picoseconds"))); + } + + // Remove timestamppicosvalue from both maps for remaining comparison + Map recordFieldsWithoutPicos = new HashMap<>(recordFields); + Map expectedValuesWithoutPicos = new HashMap<>(expectedValues); + recordFieldsWithoutPicos.remove("timestamppicosvalue"); + expectedValuesWithoutPicos.remove("timestamppicosvalue"); + + // Compare remaining fields + assertEquals(expectedValuesWithoutPicos, recordFieldsWithoutPicos); + assertEquals( withF ? BASE_ROW_EXPECTED_NAME_OVERRIDES : BASE_ROW_NO_F_EXPECTED_NAME_OVERRIDES, overriddenNames); @@ -1484,6 +1616,7 @@ public void testMessageFromTableRow() throws Exception { DynamicMessage msg = TableRowToStorageApiProto.messageFromTableRow( schemaInformation, descriptor, tableRow, false, false, null, null, -1); + assertEquals(4, msg.getAllFields().size()); Map fieldDescriptors = @@ -1511,6 +1644,7 @@ public void testTableRowFromMessageNoF() throws Exception { DynamicMessage msg = TableRowToStorageApiProto.messageFromTableRow( schemaInformation, descriptor, tableRow, false, false, null, null, -1); + TableRow recovered = TableRowToStorageApiProto.tableRowFromMessage( schemaInformation, msg, true, Predicates.alwaysTrue()); From bcad803d6e7487d9de3998852885805db9bc0c7f Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Dec 2025 08:05:33 +0200 Subject: [PATCH 2/7] typo --- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index f751875b5cb0..ec5e3bfd2111 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -101,7 +101,7 @@ public class BigQueryUtils { private static final Pattern TABLE_RESOURCE_PATTERN = Pattern.compile( "^projects/(?[^/]+)/datasets/(?[^/]+)/tables/(?
[^/]+)$"); -ig + // For parsing the format used to refer to tables parameters in BigQueryIO. // "{project_id}:{dataset_id}.{table_id}" or // "{project_id}.{dataset_id}.{table_id}" From 85ce5523eb70510f966bbeebdf280354aa426229 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Dec 2025 08:46:54 +0200 Subject: [PATCH 3/7] test refactor. --- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 4 +- .../bigquery/TableRowToStorageApiProto.java | 8 +- .../bigquery/BigQueryTimestampPicosIT.java | 732 +++++++++--------- 3 files changed, 395 insertions(+), 349 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index ec5e3bfd2111..2ed3eab5cb7e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -115,6 +115,8 @@ public class BigQueryUtils { + "(?[a-zA-Z0-9_]{1,1024})[\\.]" + "(?
[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}\\p{Zs}$]{1,1024})$"); + private static final long PICOSECOND_PRECISION = 12L; + /** Options for how to convert BigQuery data to Beam data. */ @AutoValue public abstract static class ConversionOptions implements Serializable { @@ -416,7 +418,7 @@ public static TimestampPicos parseTimestampPicosFromString(String timestampStrin String fractionalPart = timestampString.substring(dotIndex + 1, timestampString.length() - 1); - if (fractionalPart.length() == 12) { + if ((long) fractionalPart.length() == PICOSECOND_PRECISION) { // ISO timestamp with 12 decimal digits (picosecond precision) // Parse the datetime part (without fractional seconds) String dateTimePart = timestampString.substring(0, dotIndex) + "Z"; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index 57dfb8f0f61b..5ab2de52648b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -216,6 +216,8 @@ public interface ThrowingBiFunction { static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat("0.0###############", DecimalFormatSymbols.getInstance(Locale.ROOT)); + private static final long PICOSECOND_PRECISION = 12L; + // Map of functions to convert json values into the value expected in the Vortex proto object. static final Map> TYPE_MAP_PROTO_CONVERTERS = @@ -982,7 +984,7 @@ static TableFieldSchema tableFieldSchemaFromDescriptorField(FieldDescriptor fiel case MESSAGE: if (fieldDescriptor.getMessageType().getName().equals("TimestampPicos")) { tableFieldSchemaBuilder.setType(TableFieldSchema.Type.TIMESTAMP); - tableFieldSchemaBuilder.setPrecision(12); + tableFieldSchemaBuilder.setPrecision(PICOSECOND_PRECISION); } else { tableFieldSchemaBuilder = tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT); TableSchema nestedTableField = @@ -1090,7 +1092,7 @@ private static void fieldDescriptorFromTableField( fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName()); break; case TIMESTAMP: - if (fieldSchema.getTimestampPrecision().getValue() == 12) { + if (fieldSchema.getTimestampPrecision().getValue() == PICOSECOND_PRECISION) { boolean typeAlreadyExists = descriptorBuilder.getNestedTypeList().stream() .anyMatch(d -> TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName().equals(d.getName())); @@ -1362,7 +1364,7 @@ public static ByteString mergeNewFields( null); } } else if (schemaInformation.getType() == TableFieldSchema.Type.TIMESTAMP - && schemaInformation.getTimestampPrecision() == 12) { + && schemaInformation.getTimestampPrecision() == PICOSECOND_PRECISION) { long seconds; long picoseconds; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java index 2e71c473631a..d956a98c4bbb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java @@ -38,263 +38,170 @@ import org.junit.runners.JUnit4; /** - * Integration tests for BigQuery TIMESTAMP with various precisions. + * Integration tests for BigQuery TIMESTAMP with picosecond precision. * - *

Tests write timestamps via Storage Write API and read back using different format (AVRO/ARROW) - * and precision (PICOS/NANOS/MICROS) combinations. - * - *

Two tables are used: - * - *

    - *
  • FULL_RANGE: dates from 0001-01-01 to 9999-12-31 (for PICOS and MICROS reads) - *
  • NANOS_RANGE: dates within int64 nanos bounds ~1678 to ~2261 (for NANOS reads, which use - * Avro/Arrow int64 logical types that overflow outside this range) - *
+ *

Tests write data via Storage Write API and read back using different precision settings. Each + * test clearly shows: WRITE DATA → READ SETTINGS → EXPECTED OUTPUT. */ @RunWith(JUnit4.class) public class BigQueryTimestampPicosIT { + private static final long PICOS_PRECISION = 12L; + private static String project; private static final String DATASET_ID = - "bq_timestamp_picos_it_" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32); - - private static TestBigQueryOptions bqOptions; + "bq_ts_picos_" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32); private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryTimestampPicosIT"); + private static TestBigQueryOptions bqOptions; + private static String tableSpec; - private static String fullRangeTableSpec; - private static String nanosRangeTableSpec; - private static final String FULL_RANGE_TABLE = "timestamp_full_range"; - private static final String NANOS_RANGE_TABLE = "timestamp_nanos_range"; - - // TEST DATA + // ============================================================================ + // UNIFIED SCHEMA - Contains all timestamp column types + // ============================================================================ // - // Tables have 4 timestamp columns written at different precisions: - // ts_picos - written with 12 fractional digits (picoseconds) - // ts_nanos - written with 9 fractional digits (nanoseconds) - // ts_micros - written with 6 fractional digits (microseconds) - // ts_millis - written with 3 fractional digits (milliseconds) - - /* - * FULL_RANGE table - for PICOS and MICROS reads - * Contains dates outside int64 nanos bounds (0001 and 9999) - */ - private static final List FULL_RANGE_WRITE = - ImmutableList.of( - row( - "2024-01-15T10:30:45.123456789012Z", - "2024-01-15 10:30:45.123456789 UTC", - "2024-01-15 10:30:45.123456 UTC", - "2024-01-15 10:30:45.123 UTC"), - row( - "2024-06-20T15:45:30.987654321098Z", - "2024-06-20 15:45:30.987654321 UTC", - "2024-06-20 15:45:30.987654 UTC", - "2024-06-20 15:45:30.987 UTC"), - row( - "0001-01-01T00:00:00.111222333444Z", - "0001-01-01 00:00:00.111222333 UTC", - "0001-01-01 00:00:00.111222 UTC", - "0001-01-01 00:00:00.111 UTC"), - row( - "1970-01-01T00:00:00.555666777888Z", - "1970-01-01 00:00:00.555666777 UTC", - "1970-01-01 00:00:00.555666 UTC", - "1970-01-01 00:00:00.555 UTC"), - row( - "9999-12-31T23:59:59.999888777666Z", - "9999-12-31 23:59:59.999888777 UTC", - "9999-12-31 23:59:59.999888 UTC", - "9999-12-31 23:59:59.999 UTC")); - - /* - * NANOS_RANGE table - for NANOS reads - * Only dates within int64 nanos-since-epoch bounds (~1678 to ~2261) - */ - private static final List NANOS_RANGE_WRITE = - ImmutableList.of( - row( - "2024-01-15T10:30:45.123456789012Z", - "2024-01-15 10:30:45.123456789 UTC", - "2024-01-15 10:30:45.123456 UTC", - "2024-01-15 10:30:45.123 UTC"), - row( - "2024-06-20T15:45:30.987654321098Z", - "2024-06-20 15:45:30.987654321 UTC", - "2024-06-20 15:45:30.987654 UTC", - "2024-06-20 15:45:30.987 UTC"), - row( - "1678-09-21T00:12:43.145224192555Z", - "1678-09-21 00:12:43.145224192 UTC", - "1678-09-21 00:12:43.145224 UTC", - "1678-09-21 00:12:43.145 UTC"), - row( - "1970-01-01T00:00:00.555666777888Z", - "1970-01-01 00:00:00.555666777 UTC", - "1970-01-01 00:00:00.555666 UTC", - "1970-01-01 00:00:00.555 UTC"), - row( - "2261-04-11T23:47:16.854775807333Z", - "2261-04-11 23:47:16.854775807 UTC", - "2261-04-11 23:47:16.854775 UTC", - "2261-04-11 23:47:16.854 UTC")); - - private static final List FULL_RANGE_READ_PICOS = - ImmutableList.of( - row( - "2024-01-15T10:30:45.123456789012Z", - "2024-01-15T10:30:45.123456789000Z", - "2024-01-15T10:30:45.123456000000Z", - "2024-01-15T10:30:45.123000000000Z"), - row( - "2024-06-20T15:45:30.987654321098Z", - "2024-06-20T15:45:30.987654321000Z", - "2024-06-20T15:45:30.987654000000Z", - "2024-06-20T15:45:30.987000000000Z"), - row( - "0001-01-01T00:00:00.111222333444Z", - "0001-01-01T00:00:00.111222333000Z", - "0001-01-01T00:00:00.111222000000Z", - "0001-01-01T00:00:00.111000000000Z"), - row( - "1970-01-01T00:00:00.555666777888Z", - "1970-01-01T00:00:00.555666777000Z", - "1970-01-01T00:00:00.555666000000Z", - "1970-01-01T00:00:00.555000000000Z"), - row( - "9999-12-31T23:59:59.999888777666Z", - "9999-12-31T23:59:59.999888777000Z", - "9999-12-31T23:59:59.999888000000Z", - "9999-12-31T23:59:59.999000000000Z")); - - private static final List NANOS_RANGE_READ_NANOS = - ImmutableList.of( - row( - "2024-01-15 10:30:45.123456789 UTC", - "2024-01-15 10:30:45.123456789 UTC", - "2024-01-15 10:30:45.123456 UTC", - "2024-01-15 10:30:45.123 UTC"), - row( - "2024-06-20 15:45:30.987654321 UTC", - "2024-06-20 15:45:30.987654321 UTC", - "2024-06-20 15:45:30.987654 UTC", - "2024-06-20 15:45:30.987 UTC"), - row( - "1678-09-21 00:12:43.145224192 UTC", - "1678-09-21 00:12:43.145224192 UTC", - "1678-09-21 00:12:43.145224 UTC", - "1678-09-21 00:12:43.145 UTC"), - row( - "1970-01-01 00:00:00.555666777 UTC", - "1970-01-01 00:00:00.555666777 UTC", - "1970-01-01 00:00:00.555666 UTC", - "1970-01-01 00:00:00.555 UTC"), - row( - "2261-04-11 23:47:16.854775807 UTC", - "2261-04-11 23:47:16.854775807 UTC", - "2261-04-11 23:47:16.854775 UTC", - "2261-04-11 23:47:16.854 UTC")); - - private static final List FULL_RANGE_READ_MICROS = - ImmutableList.of( - row( - "2024-01-15 10:30:45.123456 UTC", - "2024-01-15 10:30:45.123456 UTC", - "2024-01-15 10:30:45.123456 UTC", - "2024-01-15 10:30:45.123 UTC"), - row( - "2024-06-20 15:45:30.987654 UTC", - "2024-06-20 15:45:30.987654 UTC", - "2024-06-20 15:45:30.987654 UTC", - "2024-06-20 15:45:30.987 UTC"), - row( - "0001-01-01 00:00:00.111222 UTC", - "0001-01-01 00:00:00.111222 UTC", - "0001-01-01 00:00:00.111222 UTC", - "0001-01-01 00:00:00.111 UTC"), - row( - "1970-01-01 00:00:00.555666 UTC", - "1970-01-01 00:00:00.555666 UTC", - "1970-01-01 00:00:00.555666 UTC", - "1970-01-01 00:00:00.555 UTC"), - row( - "9999-12-31 23:59:59.999888 UTC", - "9999-12-31 23:59:59.999888 UTC", - "9999-12-31 23:59:59.999888 UTC", - "9999-12-31 23:59:59.999 UTC")); - - private static final List FULL_RANGE_READ_MICROS_ARROW = + // Schema structure: + // - ts_simple: TIMESTAMP(12) + // - ts_array: ARRAY + // - event: STRUCT< + // name: STRING, + // ts: TIMESTAMP(12) + // > + // - events: ARRAY> + // - ts_map: ARRAY> + + private static final TableSchema SCHEMA = + new TableSchema() + .setFields( + ImmutableList.of( + // Simple timestamp column + new TableFieldSchema() + .setName("ts_simple") + .setType("TIMESTAMP") + .setTimestampPrecision(PICOS_PRECISION), + // Array of timestamps + new TableFieldSchema() + .setName("ts_array") + .setType("TIMESTAMP") + .setTimestampPrecision(PICOS_PRECISION) + .setMode("REPEATED"), + // Nested struct with timestamp + new TableFieldSchema() + .setName("event") + .setType("STRUCT") + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema() + .setName("ts") + .setType("TIMESTAMP") + .setTimestampPrecision(PICOS_PRECISION))), + // Repeated struct with timestamp + new TableFieldSchema() + .setName("events") + .setType("STRUCT") + .setMode("REPEATED") + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema() + .setName("ts") + .setType("TIMESTAMP") + .setTimestampPrecision(PICOS_PRECISION))), + // Map-like: repeated struct with timestamp key and value + new TableFieldSchema() + .setName("ts_map") + .setType("STRUCT") + .setMode("REPEATED") + .setFields( + ImmutableList.of( + new TableFieldSchema() + .setName("key") + .setType("TIMESTAMP") + .setTimestampPrecision(PICOS_PRECISION), + new TableFieldSchema() + .setName("value") + .setType("TIMESTAMP") + .setTimestampPrecision(PICOS_PRECISION))))); + + // ============================================================================ + // TEST DATA - Written once, read with different precision settings + // ============================================================================ + + private static final List WRITE_DATA = ImmutableList.of( - row( - "2024-01-15 10:30:45.123 UTC", - "2024-01-15 10:30:45.123 UTC", - "2024-01-15 10:30:45.123 UTC", - "2024-01-15 10:30:45.123 UTC"), - row( - "2024-06-20 15:45:30.987 UTC", - "2024-06-20 15:45:30.987 UTC", - "2024-06-20 15:45:30.987 UTC", - "2024-06-20 15:45:30.987 UTC"), - row( - "0001-01-01 00:00:00.111 UTC", - "0001-01-01 00:00:00.111 UTC", - "0001-01-01 00:00:00.111 UTC", - "0001-01-01 00:00:00.111 UTC"), - row( - "1970-01-01 00:00:00.555 UTC", - "1970-01-01 00:00:00.555 UTC", - "1970-01-01 00:00:00.555 UTC", - "1970-01-01 00:00:00.555 UTC"), - row( - "9999-12-31 23:59:59.999 UTC", - "9999-12-31 23:59:59.999 UTC", - "9999-12-31 23:59:59.999 UTC", - "9999-12-31 23:59:59.999 UTC")); - - private static TableRow row(String picos, String nanos, String micros, String millis) { - return new TableRow() - .set("ts_picos", picos) - .set("ts_nanos", nanos) - .set("ts_micros", micros) - .set("ts_millis", millis); - } + new TableRow() + .set("ts_simple", "2024-01-15T10:30:45.123456789012Z") + .set( + "ts_array", + ImmutableList.of( + "2024-01-15T10:30:45.111111111111Z", "2024-06-20T15:45:30.222222222222Z")) + .set( + "event", + new TableRow() + .set("name", "login") + .set("ts", "2024-01-15T10:30:45.333333333333Z")) + .set( + "events", + ImmutableList.of( + new TableRow() + .set("name", "click") + .set("ts", "2024-01-15T10:30:45.444444444444Z"), + new TableRow() + .set("name", "scroll") + .set("ts", "2024-01-15T10:30:45.555555555555Z"))) + .set( + "ts_map", + ImmutableList.of( + new TableRow() + .set("key", "2024-01-15T10:30:45.666666666666Z") + .set("value", "2024-01-15T10:30:45.777777777777Z"))), + new TableRow() + .set("ts_simple", "1970-01-01T00:00:00.000000000001Z") + .set("ts_array", ImmutableList.of("1970-01-01T00:00:00.000000000002Z")) + .set( + "event", + new TableRow() + .set("name", "epoch") + .set("ts", "1970-01-01T00:00:00.000000000003Z")) + .set( + "events", + ImmutableList.of( + new TableRow() + .set("name", "start") + .set("ts", "1970-01-01T00:00:00.000000000004Z"))) + .set( + "ts_map", + ImmutableList.of( + new TableRow() + .set("key", "1970-01-01T00:00:00.000000000005Z") + .set("value", "1970-01-01T00:00:00.000000000006Z")))); @BeforeClass public static void setup() throws Exception { bqOptions = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class); project = bqOptions.as(GcpOptions.class).getProject(); - BQ_CLIENT.createNewDataset(project, DATASET_ID, null, "us-central1"); + tableSpec = String.format("%s:%s.%s", project, DATASET_ID, "timestamp_picos_test"); - fullRangeTableSpec = String.format("%s:%s.%s", project, DATASET_ID, FULL_RANGE_TABLE); - nanosRangeTableSpec = String.format("%s:%s.%s", project, DATASET_ID, NANOS_RANGE_TABLE); - - // Write full range table - Pipeline writePipeline1 = Pipeline.create(bqOptions); - writePipeline1 - .apply("CreateFullRange", Create.of(FULL_RANGE_WRITE)) + // Write test data + Pipeline writePipeline = Pipeline.create(bqOptions); + writePipeline + .apply("CreateData", Create.of(WRITE_DATA)) .apply( - "WriteFullRange", + "WriteData", BigQueryIO.writeTableRows() - .to(fullRangeTableSpec) - .withSchema(createSchema()) + .to(tableSpec) + .withSchema(SCHEMA) .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); - writePipeline1.run().waitUntilFinish(); - - // Write nanos range table - Pipeline writePipeline2 = Pipeline.create(bqOptions); - writePipeline2 - .apply("CreateNanosRange", Create.of(NANOS_RANGE_WRITE)) - .apply( - "WriteNanosRange", - BigQueryIO.writeTableRows() - .to(nanosRangeTableSpec) - .withSchema(createSchema()) - .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); - writePipeline2.run().waitUntilFinish(); + writePipeline.run().waitUntilFinish(); } @AfterClass @@ -302,129 +209,264 @@ public static void cleanup() { BQ_CLIENT.deleteDataset(project, DATASET_ID); } - private static TableSchema createSchema() { - return new TableSchema() - .setFields( - ImmutableList.of( - new TableFieldSchema() - .setName("ts_picos") - .setType("TIMESTAMP") - .setTimestampPrecision(12L), - new TableFieldSchema() - .setName("ts_nanos") - .setType("TIMESTAMP") - .setTimestampPrecision(12L), - new TableFieldSchema() - .setName("ts_micros") - .setType("TIMESTAMP") - .setTimestampPrecision(12L), - new TableFieldSchema() - .setName("ts_millis") - .setType("TIMESTAMP") - .setTimestampPrecision(12L))); - } - - private void runReadTest( - TimestampPrecision precision, - DataFormat format, - String tableSpec, - String tableName, - List expectedRows) { - - Pipeline readPipeline = Pipeline.create(bqOptions); - - PCollection fromTable = - readPipeline.apply( - "ReadFromTable", - BigQueryIO.readTableRows() - .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) - .withFormat(format) - .withDirectReadPicosTimestampPrecision(precision) - .from(tableSpec)); - PCollection fromTableWithSchema = - readPipeline.apply( - "ReadFromTableWithSchema", - BigQueryIO.readTableRowsWithSchema() - .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) - .withFormat(format) - .withDirectReadPicosTimestampPrecision(precision) - .from(tableSpec)); - - PCollection fromQuery = - readPipeline.apply( - "ReadFromQuery", - BigQueryIO.readTableRows() - .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) - .fromQuery(String.format("SELECT * FROM %s.%s.%s", project, DATASET_ID, tableName)) - .usingStandardSql() - .withFormat(format) - .withDirectReadPicosTimestampPrecision(precision)); - - PAssert.that(fromTable).containsInAnyOrder(expectedRows); - PAssert.that(fromQuery).containsInAnyOrder(expectedRows); - PAssert.that(fromTableWithSchema).containsInAnyOrder(expectedRows); - - readPipeline.run().waitUntilFinish(); - } - @Test - public void testRead_Picos_Avro() { - runReadTest( - TimestampPrecision.PICOS, - DataFormat.AVRO, - fullRangeTableSpec, - FULL_RANGE_TABLE, - FULL_RANGE_READ_PICOS); + public void testReadWithPicosPrecision_Avro() { + // WRITE DATA (written in @BeforeClass): + // ts_simple: "2024-01-15T10:30:45.123456789012Z" (12 digits) + // ts_array: ["...111111111111Z", "...222222222222Z"] + // event: {name: "login", ts: "...333333333333Z"} + // events: [{name: "click", ts: "...444444444444Z"}, ...] + // ts_map: [{key: "...666666666666Z", value: "...777777777777Z"}] + // + // READ SETTINGS: + // precision: PICOS (12 digits) + // format: AVRO + // + // EXPECTED: All 12 digits preserved in ISO format + + List expectedOutput = + ImmutableList.of( + new TableRow() + .set("ts_simple", "2024-01-15T10:30:45.123456789012Z") + .set( + "ts_array", + ImmutableList.of( + "2024-01-15T10:30:45.111111111111Z", "2024-06-20T15:45:30.222222222222Z")) + .set( + "event", + new TableRow() + .set("name", "login") + .set("ts", "2024-01-15T10:30:45.333333333333Z")) + .set( + "events", + ImmutableList.of( + new TableRow() + .set("name", "click") + .set("ts", "2024-01-15T10:30:45.444444444444Z"), + new TableRow() + .set("name", "scroll") + .set("ts", "2024-01-15T10:30:45.555555555555Z"))) + .set( + "ts_map", + ImmutableList.of( + new TableRow() + .set("key", "2024-01-15T10:30:45.666666666666Z") + .set("value", "2024-01-15T10:30:45.777777777777Z"))), + new TableRow() + .set("ts_simple", "1970-01-01T00:00:00.000000000001Z") + .set("ts_array", ImmutableList.of("1970-01-01T00:00:00.000000000002Z")) + .set( + "event", + new TableRow() + .set("name", "epoch") + .set("ts", "1970-01-01T00:00:00.000000000003Z")) + .set( + "events", + ImmutableList.of( + new TableRow() + .set("name", "start") + .set("ts", "1970-01-01T00:00:00.000000000004Z"))) + .set( + "ts_map", + ImmutableList.of( + new TableRow() + .set("key", "1970-01-01T00:00:00.000000000005Z") + .set("value", "1970-01-01T00:00:00.000000000006Z")))); + + runReadTest(TimestampPrecision.PICOS, DataFormat.AVRO, expectedOutput); } @Test - public void testRead_Picos_Arrow() { - runReadTest( - TimestampPrecision.PICOS, - DataFormat.ARROW, - fullRangeTableSpec, - FULL_RANGE_TABLE, - FULL_RANGE_READ_PICOS); + public void testReadWithPicosPrecision_Arrow() { + // WRITE DATA: Same as above (12-digit picosecond timestamps) + // + // READ SETTINGS: + // precision: PICOS (12 digits) + // format: ARROW + // + // EXPECTED: All 12 digits preserved in ISO format + + List expectedOutput = + ImmutableList.of( + new TableRow() + .set("ts_simple", "2024-01-15T10:30:45.123456789012Z") + .set( + "ts_array", + ImmutableList.of( + "2024-01-15T10:30:45.111111111111Z", "2024-06-20T15:45:30.222222222222Z")) + .set( + "event", + new TableRow() + .set("name", "login") + .set("ts", "2024-01-15T10:30:45.333333333333Z")) + .set( + "events", + ImmutableList.of( + new TableRow() + .set("name", "click") + .set("ts", "2024-01-15T10:30:45.444444444444Z"), + new TableRow() + .set("name", "scroll") + .set("ts", "2024-01-15T10:30:45.555555555555Z"))) + .set( + "ts_map", + ImmutableList.of( + new TableRow() + .set("key", "2024-01-15T10:30:45.666666666666Z") + .set("value", "2024-01-15T10:30:45.777777777777Z"))), + new TableRow() + .set("ts_simple", "1970-01-01T00:00:00.000000000001Z") + .set("ts_array", ImmutableList.of("1970-01-01T00:00:00.000000000002Z")) + .set( + "event", + new TableRow() + .set("name", "epoch") + .set("ts", "1970-01-01T00:00:00.000000000003Z")) + .set( + "events", + ImmutableList.of( + new TableRow() + .set("name", "start") + .set("ts", "1970-01-01T00:00:00.000000000004Z"))) + .set( + "ts_map", + ImmutableList.of( + new TableRow() + .set("key", "1970-01-01T00:00:00.000000000005Z") + .set("value", "1970-01-01T00:00:00.000000000006Z")))); + + runReadTest(TimestampPrecision.PICOS, DataFormat.ARROW, expectedOutput); } @Test - public void testRead_Nanos_Avro() { - runReadTest( - TimestampPrecision.NANOS, - DataFormat.AVRO, - nanosRangeTableSpec, - NANOS_RANGE_TABLE, - NANOS_RANGE_READ_NANOS); + public void testReadWithNanosPrecision_Avro() { + // WRITE DATA: 12-digit picosecond timestamps + // + // READ SETTINGS: + // precision: NANOS (9 digits) + // format: AVRO + // + // EXPECTED: Truncated to 9 digits, UTC format + // "2024-01-15T10:30:45.123456789012Z" → "2024-01-15 10:30:45.123456789 UTC" + + List expectedOutput = + ImmutableList.of( + new TableRow() + .set("ts_simple", "2024-01-15 10:30:45.123456789 UTC") + .set( + "ts_array", + ImmutableList.of( + "2024-01-15 10:30:45.111111111 UTC", "2024-06-20 15:45:30.222222222 UTC")) + .set( + "event", + new TableRow() + .set("name", "login") + .set("ts", "2024-01-15 10:30:45.333333333 UTC")) + .set( + "events", + ImmutableList.of( + new TableRow() + .set("name", "click") + .set("ts", "2024-01-15 10:30:45.444444444 UTC"), + new TableRow() + .set("name", "scroll") + .set("ts", "2024-01-15 10:30:45.555555555 UTC"))) + .set( + "ts_map", + ImmutableList.of( + new TableRow() + .set("key", "2024-01-15 10:30:45.666666666 UTC") + .set("value", "2024-01-15 10:30:45.777777777 UTC"))), + new TableRow() + .set("ts_simple", "1970-01-01 00:00:00 UTC") // .000000000 truncated + .set("ts_array", ImmutableList.of("1970-01-01 00:00:00 UTC")) + .set( + "event", + new TableRow().set("name", "epoch").set("ts", "1970-01-01 00:00:00 UTC")) + .set( + "events", + ImmutableList.of( + new TableRow().set("name", "start").set("ts", "1970-01-01 00:00:00 UTC"))) + .set( + "ts_map", + ImmutableList.of( + new TableRow() + .set("key", "1970-01-01 00:00:00 UTC") + .set("value", "1970-01-01 00:00:00 UTC")))); + + runReadTest(TimestampPrecision.NANOS, DataFormat.AVRO, expectedOutput); } @Test - public void testRead_Nanos_Arrow() { - runReadTest( - TimestampPrecision.NANOS, - DataFormat.ARROW, - nanosRangeTableSpec, - NANOS_RANGE_TABLE, - NANOS_RANGE_READ_NANOS); + public void testReadWithMicrosPrecision_Avro() { + // WRITE DATA: 12-digit picosecond timestamps + // + // READ SETTINGS: + // precision: MICROS (6 digits) + // format: AVRO + // + // EXPECTED: Truncated to 6 digits, UTC format + // "2024-01-15T10:30:45.123456789012Z" → "2024-01-15 10:30:45.123456 UTC" + + List expectedOutput = + ImmutableList.of( + new TableRow() + .set("ts_simple", "2024-01-15 10:30:45.123456 UTC") + .set( + "ts_array", + ImmutableList.of( + "2024-01-15 10:30:45.111111 UTC", "2024-06-20 15:45:30.222222 UTC")) + .set( + "event", + new TableRow().set("name", "login").set("ts", "2024-01-15 10:30:45.333333 UTC")) + .set( + "events", + ImmutableList.of( + new TableRow() + .set("name", "click") + .set("ts", "2024-01-15 10:30:45.444444 UTC"), + new TableRow() + .set("name", "scroll") + .set("ts", "2024-01-15 10:30:45.555555 UTC"))) + .set( + "ts_map", + ImmutableList.of( + new TableRow() + .set("key", "2024-01-15 10:30:45.666666 UTC") + .set("value", "2024-01-15 10:30:45.777777 UTC"))), + new TableRow() + .set("ts_simple", "1970-01-01 00:00:00 UTC") + .set("ts_array", ImmutableList.of("1970-01-01 00:00:00 UTC")) + .set( + "event", + new TableRow().set("name", "epoch").set("ts", "1970-01-01 00:00:00 UTC")) + .set( + "events", + ImmutableList.of( + new TableRow().set("name", "start").set("ts", "1970-01-01 00:00:00 UTC"))) + .set( + "ts_map", + ImmutableList.of( + new TableRow() + .set("key", "1970-01-01 00:00:00 UTC") + .set("value", "1970-01-01 00:00:00 UTC")))); + + runReadTest(TimestampPrecision.MICROS, DataFormat.AVRO, expectedOutput); } + private void runReadTest( + TimestampPrecision precision, DataFormat format, List expectedOutput) { + Pipeline readPipeline = Pipeline.create(bqOptions); - @Test - public void testRead_Micros_Avro() { - runReadTest( - TimestampPrecision.MICROS, - DataFormat.AVRO, - fullRangeTableSpec, - FULL_RANGE_TABLE, - FULL_RANGE_READ_MICROS); - } + PCollection result = + readPipeline.apply( + String.format("Read_%s_%s", precision, format), + BigQueryIO.readTableRows() + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) + .withFormat(format) + .withDirectReadPicosTimestampPrecision(precision) + .from(tableSpec)); - @Test - public void testRead_Micros_Arrow() { - // Known issue: Arrow MICROS truncates to milliseconds - runReadTest( - TimestampPrecision.MICROS, - DataFormat.ARROW, - fullRangeTableSpec, - FULL_RANGE_TABLE, - FULL_RANGE_READ_MICROS_ARROW); + PAssert.that(result).containsInAnyOrder(expectedOutput); + readPipeline.run().waitUntilFinish(); } } From dcb1e017b726e9f61c85b2ebd307e9237943eca0 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Dec 2025 12:24:35 +0000 Subject: [PATCH 4/7] Fix tests. --- .../bigquery/BigQueryTimestampPicosIT.java | 197 +++++++----------- 1 file changed, 70 insertions(+), 127 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java index d956a98c4bbb..7af674c2ed65 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java @@ -53,29 +53,10 @@ public class BigQueryTimestampPicosIT { "bq_ts_picos_" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32); private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryTimestampPicosIT"); private static TestBigQueryOptions bqOptions; - private static String tableSpec; + private static String nestedTableSpec; + private static String simpleTableSpec; - // ============================================================================ - // UNIFIED SCHEMA - Contains all timestamp column types - // ============================================================================ - // - // Schema structure: - // - ts_simple: TIMESTAMP(12) - // - ts_array: ARRAY - // - event: STRUCT< - // name: STRING, - // ts: TIMESTAMP(12) - // > - // - events: ARRAY> - // - ts_map: ARRAY> - - private static final TableSchema SCHEMA = + private static final TableSchema NESTED_SCHEMA = new TableSchema() .setFields( ImmutableList.of( @@ -129,11 +110,20 @@ public class BigQueryTimestampPicosIT { .setType("TIMESTAMP") .setTimestampPrecision(PICOS_PRECISION))))); + private static final TableSchema SIMPLE_SCHEMA = + new TableSchema() + .setFields( + ImmutableList.of( + // Simple timestamp column + new TableFieldSchema() + .setName("ts_simple") + .setType("TIMESTAMP") + .setTimestampPrecision(PICOS_PRECISION))); + // ============================================================================ // TEST DATA - Written once, read with different precision settings // ============================================================================ - - private static final List WRITE_DATA = + private static final List NESTED_WRITE_DATA = ImmutableList.of( new TableRow() .set("ts_simple", "2024-01-15T10:30:45.123456789012Z") @@ -162,7 +152,7 @@ public class BigQueryTimestampPicosIT { .set("key", "2024-01-15T10:30:45.666666666666Z") .set("value", "2024-01-15T10:30:45.777777777777Z"))), new TableRow() - .set("ts_simple", "1970-01-01T00:00:00.000000000001Z") + .set("ts_simple", "1890-01-01T00:00:00.123456789123Z") .set("ts_array", ImmutableList.of("1970-01-01T00:00:00.000000000002Z")) .set( "event", @@ -182,22 +172,38 @@ public class BigQueryTimestampPicosIT { .set("key", "1970-01-01T00:00:00.000000000005Z") .set("value", "1970-01-01T00:00:00.000000000006Z")))); + private static final List SIMPLE_WRITE_DATA = + ImmutableList.of( + new TableRow().set("ts_simple", "2024-01-15T10:30:45.123456789012Z"), + new TableRow().set("ts_simple", "1890-01-01T00:00:00.123456789123Z")); + @BeforeClass public static void setup() throws Exception { bqOptions = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class); project = bqOptions.as(GcpOptions.class).getProject(); BQ_CLIENT.createNewDataset(project, DATASET_ID, null, "us-central1"); - tableSpec = String.format("%s:%s.%s", project, DATASET_ID, "timestamp_picos_test"); + nestedTableSpec = String.format("%s:%s.%s", project, DATASET_ID, "nested_timestamp_picos_test"); + simpleTableSpec = String.format("%s:%s.%s", project, DATASET_ID, "simple_timestamp_picos_test"); // Write test data Pipeline writePipeline = Pipeline.create(bqOptions); writePipeline - .apply("CreateData", Create.of(WRITE_DATA)) + .apply("CreateNestedData", Create.of(NESTED_WRITE_DATA)) .apply( - "WriteData", + "WriteNestedData", BigQueryIO.writeTableRows() - .to(tableSpec) - .withSchema(SCHEMA) + .to(nestedTableSpec) + .withSchema(NESTED_SCHEMA) + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); + writePipeline + .apply("CreateSimpleData", Create.of(SIMPLE_WRITE_DATA)) + .apply( + "WriteSimpleData", + BigQueryIO.writeTableRows() + .to(simpleTableSpec) + .withSchema(SIMPLE_SCHEMA) .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); @@ -211,80 +217,6 @@ public static void cleanup() { @Test public void testReadWithPicosPrecision_Avro() { - // WRITE DATA (written in @BeforeClass): - // ts_simple: "2024-01-15T10:30:45.123456789012Z" (12 digits) - // ts_array: ["...111111111111Z", "...222222222222Z"] - // event: {name: "login", ts: "...333333333333Z"} - // events: [{name: "click", ts: "...444444444444Z"}, ...] - // ts_map: [{key: "...666666666666Z", value: "...777777777777Z"}] - // - // READ SETTINGS: - // precision: PICOS (12 digits) - // format: AVRO - // - // EXPECTED: All 12 digits preserved in ISO format - - List expectedOutput = - ImmutableList.of( - new TableRow() - .set("ts_simple", "2024-01-15T10:30:45.123456789012Z") - .set( - "ts_array", - ImmutableList.of( - "2024-01-15T10:30:45.111111111111Z", "2024-06-20T15:45:30.222222222222Z")) - .set( - "event", - new TableRow() - .set("name", "login") - .set("ts", "2024-01-15T10:30:45.333333333333Z")) - .set( - "events", - ImmutableList.of( - new TableRow() - .set("name", "click") - .set("ts", "2024-01-15T10:30:45.444444444444Z"), - new TableRow() - .set("name", "scroll") - .set("ts", "2024-01-15T10:30:45.555555555555Z"))) - .set( - "ts_map", - ImmutableList.of( - new TableRow() - .set("key", "2024-01-15T10:30:45.666666666666Z") - .set("value", "2024-01-15T10:30:45.777777777777Z"))), - new TableRow() - .set("ts_simple", "1970-01-01T00:00:00.000000000001Z") - .set("ts_array", ImmutableList.of("1970-01-01T00:00:00.000000000002Z")) - .set( - "event", - new TableRow() - .set("name", "epoch") - .set("ts", "1970-01-01T00:00:00.000000000003Z")) - .set( - "events", - ImmutableList.of( - new TableRow() - .set("name", "start") - .set("ts", "1970-01-01T00:00:00.000000000004Z"))) - .set( - "ts_map", - ImmutableList.of( - new TableRow() - .set("key", "1970-01-01T00:00:00.000000000005Z") - .set("value", "1970-01-01T00:00:00.000000000006Z")))); - - runReadTest(TimestampPrecision.PICOS, DataFormat.AVRO, expectedOutput); - } - - @Test - public void testReadWithPicosPrecision_Arrow() { - // WRITE DATA: Same as above (12-digit picosecond timestamps) - // - // READ SETTINGS: - // precision: PICOS (12 digits) - // format: ARROW - // - // EXPECTED: All 12 digits preserved in ISO format List expectedOutput = ImmutableList.of( @@ -315,7 +247,7 @@ public void testReadWithPicosPrecision_Arrow() { .set("key", "2024-01-15T10:30:45.666666666666Z") .set("value", "2024-01-15T10:30:45.777777777777Z"))), new TableRow() - .set("ts_simple", "1970-01-01T00:00:00.000000000001Z") + .set("ts_simple", "1890-01-01T00:00:00.123456789123Z") .set("ts_array", ImmutableList.of("1970-01-01T00:00:00.000000000002Z")) .set( "event", @@ -335,19 +267,11 @@ public void testReadWithPicosPrecision_Arrow() { .set("key", "1970-01-01T00:00:00.000000000005Z") .set("value", "1970-01-01T00:00:00.000000000006Z")))); - runReadTest(TimestampPrecision.PICOS, DataFormat.ARROW, expectedOutput); + runReadTest(TimestampPrecision.PICOS, DataFormat.AVRO, expectedOutput, nestedTableSpec); } @Test public void testReadWithNanosPrecision_Avro() { - // WRITE DATA: 12-digit picosecond timestamps - // - // READ SETTINGS: - // precision: NANOS (9 digits) - // format: AVRO - // - // EXPECTED: Truncated to 9 digits, UTC format - // "2024-01-15T10:30:45.123456789012Z" → "2024-01-15 10:30:45.123456789 UTC" List expectedOutput = ImmutableList.of( @@ -378,7 +302,7 @@ public void testReadWithNanosPrecision_Avro() { .set("key", "2024-01-15 10:30:45.666666666 UTC") .set("value", "2024-01-15 10:30:45.777777777 UTC"))), new TableRow() - .set("ts_simple", "1970-01-01 00:00:00 UTC") // .000000000 truncated + .set("ts_simple", "1890-01-01 00:00:00.123456789 UTC") .set("ts_array", ImmutableList.of("1970-01-01 00:00:00 UTC")) .set( "event", @@ -394,19 +318,11 @@ public void testReadWithNanosPrecision_Avro() { .set("key", "1970-01-01 00:00:00 UTC") .set("value", "1970-01-01 00:00:00 UTC")))); - runReadTest(TimestampPrecision.NANOS, DataFormat.AVRO, expectedOutput); + runReadTest(TimestampPrecision.NANOS, DataFormat.AVRO, expectedOutput, nestedTableSpec); } @Test public void testReadWithMicrosPrecision_Avro() { - // WRITE DATA: 12-digit picosecond timestamps - // - // READ SETTINGS: - // precision: MICROS (6 digits) - // format: AVRO - // - // EXPECTED: Truncated to 6 digits, UTC format - // "2024-01-15T10:30:45.123456789012Z" → "2024-01-15 10:30:45.123456 UTC" List expectedOutput = ImmutableList.of( @@ -435,7 +351,7 @@ public void testReadWithMicrosPrecision_Avro() { .set("key", "2024-01-15 10:30:45.666666 UTC") .set("value", "2024-01-15 10:30:45.777777 UTC"))), new TableRow() - .set("ts_simple", "1970-01-01 00:00:00 UTC") + .set("ts_simple", "1890-01-01 00:00:00.123456 UTC") .set("ts_array", ImmutableList.of("1970-01-01 00:00:00 UTC")) .set( "event", @@ -451,10 +367,37 @@ public void testReadWithMicrosPrecision_Avro() { .set("key", "1970-01-01 00:00:00 UTC") .set("value", "1970-01-01 00:00:00 UTC")))); - runReadTest(TimestampPrecision.MICROS, DataFormat.AVRO, expectedOutput); + runReadTest(TimestampPrecision.MICROS, DataFormat.AVRO, expectedOutput, nestedTableSpec); + } + + @Test + public void testReadWithPicosPrecision_Arrow() { + + List expectedOutput = + ImmutableList.of( + new TableRow().set("ts_simple", "2024-01-15T10:30:45.123456789012Z"), + new TableRow().set("ts_simple", "1890-01-01T00:00:00.123456789123Z")); + + runReadTest(TimestampPrecision.PICOS, DataFormat.ARROW, expectedOutput, simpleTableSpec); } + + @Test + public void testReadWithNanosPrecision_Arrow() { + + List expectedOutput = + ImmutableList.of( + new TableRow().set("ts_simple", "2024-01-15 10:30:45.123456789 UTC"), + new TableRow().set("ts_simple", "1890-01-01 00:00:00.123456789 UTC")); + + runReadTest(TimestampPrecision.NANOS, DataFormat.ARROW, expectedOutput, simpleTableSpec); + } + + private void runReadTest( - TimestampPrecision precision, DataFormat format, List expectedOutput) { + TimestampPrecision precision, + DataFormat format, + List expectedOutput, + String tableSpec) { Pipeline readPipeline = Pipeline.create(bqOptions); PCollection result = From 0f89aa885de6e5df4fd1655e66528283504b520f Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Dec 2025 12:33:31 +0000 Subject: [PATCH 5/7] spotless --- .../beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java index 7af674c2ed65..5deffd4028f9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java @@ -392,7 +392,6 @@ public void testReadWithNanosPrecision_Arrow() { runReadTest(TimestampPrecision.NANOS, DataFormat.ARROW, expectedOutput, simpleTableSpec); } - private void runReadTest( TimestampPrecision precision, DataFormat format, From 5134c8c4ab71f32193dd51ce98d4ca57a331167b Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 23 Dec 2025 06:59:32 +0000 Subject: [PATCH 6/7] comments. --- .../google-cloud-dataflow-java/build.gradle | 2 + .../sdk/io/gcp/bigquery/BigQueryUtils.java | 82 +++++++++---------- .../bigquery/TableRowToStorageApiProto.java | 4 +- .../io/gcp/bigquery/BigQueryUtilsTest.java | 2 +- 4 files changed, 46 insertions(+), 44 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 50498d24c624..3792626a1fdf 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -643,6 +643,7 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyG exclude '**/BigQueryIODynamicQueryIT.class' exclude '**/BigQueryIODynamicReadIT.class' exclude '**/BigQueryIODynamicReadTableRowIT.class' + exclude '**/BigQueryTimestampPicosIT.java' exclude '**/PubsubReadIT.class' exclude '**/FhirIOReadIT.class' exclude '**/DicomIOReadIT.class' @@ -698,6 +699,7 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) { exclude '**/BigQueryIODynamicQueryIT.class' exclude '**/BigQueryIODynamicReadIT.class' exclude '**/BigQueryIODynamicReadTableRowIT.class' + exclude '**/BigQueryTimestampPicosIT.java' exclude '**/SpannerWriteIT.class' exclude '**/*KmsKeyIT.class' exclude '**/FhirIOReadIT.class' diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index 2ed3eab5cb7e..948987871273 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -394,53 +394,53 @@ public static class TimestampPicos { this.seconds = seconds; this.picoseconds = picoseconds; } - } - /** - * Parses a timestamp string into seconds and picoseconds components. - * - *

Handles two formats: - * - *

    - *
  • ISO format with exactly 12 fractional digits ending in Z (picosecond precision): e.g., - * "2024-01-15T10:30:45.123456789012Z" - *
  • UTC format with 0-9 fractional digits ending in "UTC" (up to nanosecond precision): e.g., - * "2024-01-15 10:30:45.123456789 UTC", "2024-01-15 10:30:45 UTC" - *
- */ - public static TimestampPicos parseTimestampPicosFromString(String timestampString) { - // Check for ISO picosecond format up to 12 fractional digits before Z - // Format: "2024-01-15T10:30:45.123456789012Z" - if (timestampString.endsWith("Z")) { - int dotIndex = timestampString.lastIndexOf('.'); - - if (dotIndex > 0) { - String fractionalPart = - timestampString.substring(dotIndex + 1, timestampString.length() - 1); - - if ((long) fractionalPart.length() == PICOSECOND_PRECISION) { - // ISO timestamp with 12 decimal digits (picosecond precision) - // Parse the datetime part (without fractional seconds) - String dateTimePart = timestampString.substring(0, dotIndex) + "Z"; - java.time.Instant baseInstant = java.time.Instant.parse(dateTimePart); - - // Parse all 12 digits directly as picoseconds (subsecond portion) - long picoseconds = Long.parseLong(fractionalPart); - - return new TimestampPicos(baseInstant.getEpochSecond(), picoseconds); + /** + * Parses a timestamp string into seconds and picoseconds components. + * + *

Handles two formats: + * + *

    + *
  • ISO format with exactly 12 fractional digits ending in Z (picosecond precision): e.g., + * "2024-01-15T10:30:45.123456789012Z" + *
  • UTC format with 0-9 fractional digits ending in "UTC" (up to nanosecond precision): + * e.g., "2024-01-15 10:30:45.123456789 UTC", "2024-01-15 10:30:45 UTC" + *
+ */ + public static TimestampPicos fromString(String timestampString) { + // Check for ISO picosecond format up to 12 fractional digits before Z + // Format: "2024-01-15T10:30:45.123456789012Z" + if (timestampString.endsWith("Z")) { + int dotIndex = timestampString.lastIndexOf('.'); + + if (dotIndex > 0) { + String fractionalPart = + timestampString.substring(dotIndex + 1, timestampString.length() - 1); + + if ((long) fractionalPart.length() == PICOSECOND_PRECISION) { + // ISO timestamp with 12 decimal digits (picosecond precision) + // Parse the datetime part (without fractional seconds) + String dateTimePart = timestampString.substring(0, dotIndex) + "Z"; + java.time.Instant baseInstant = java.time.Instant.parse(dateTimePart); + + // Parse all 12 digits directly as picoseconds (subsecond portion) + long picoseconds = Long.parseLong(fractionalPart); + + return new TimestampPicos(baseInstant.getEpochSecond(), picoseconds); + } } + + // ISO format with 0-9 fractional digits - Instant.parse handles this + java.time.Instant timestamp = java.time.Instant.parse(timestampString); + return new TimestampPicos(timestamp.getEpochSecond(), timestamp.getNano() * 1000L); } - // ISO format with 0-9 fractional digits - Instant.parse handles this - java.time.Instant timestamp = java.time.Instant.parse(timestampString); + // UTC format: "2024-01-15 10:30:45.123456789 UTC" + // Use TIMESTAMP_FORMATTER which handles space separator and "UTC" suffix + java.time.Instant timestamp = + java.time.Instant.from(TIMESTAMP_FORMATTER.parse(timestampString)); return new TimestampPicos(timestamp.getEpochSecond(), timestamp.getNano() * 1000L); } - - // UTC format: "2024-01-15 10:30:45.123456789 UTC" - // Use TIMESTAMP_FORMATTER which handles space separator and "UTC" suffix - java.time.Instant timestamp = - java.time.Instant.from(TIMESTAMP_FORMATTER.parse(timestampString)); - return new TimestampPicos(timestamp.getEpochSecond(), timestamp.getNano() * 1000L); } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index 5ab2de52648b..04a678b8ee7d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -69,6 +69,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.TimestampPicos; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions; @@ -1370,8 +1371,7 @@ public static ByteString mergeNewFields( long picoseconds; if (value instanceof String) { - BigQueryUtils.TimestampPicos parsed = - BigQueryUtils.parseTimestampPicosFromString((String) value); + TimestampPicos parsed = TimestampPicos.fromString((String) value); seconds = parsed.seconds; picoseconds = parsed.picoseconds; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index f929c47af36e..76a492bebd2c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -1546,7 +1546,7 @@ public void testParseTimestampPicosFromString() { long expectedSecs = expectedSeconds(isoEquivalent); - BigQueryUtils.TimestampPicos result = BigQueryUtils.parseTimestampPicosFromString(input); + BigQueryUtils.TimestampPicos result = BigQueryUtils.TimestampPicos.fromString(input); assertEquals( String.format("Seconds mismatch for '%s' (%s)", input, description), From 5bbedc9153c3f1c07f2f8ba0a29dba693b4fae78 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 30 Dec 2025 16:47:53 -0500 Subject: [PATCH 7/7] handle joda instant. --- .../sdk/io/gcp/bigquery/TableRowToStorageApiProto.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index 04a678b8ee7d..ab5ae80065a4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -1375,8 +1375,11 @@ public static ByteString mergeNewFields( seconds = parsed.seconds; picoseconds = parsed.picoseconds; - } else if (value instanceof Instant) { - Instant timestamp = (Instant) value; + } else if (value instanceof Instant || value instanceof org.joda.time.Instant) { + Instant timestamp = + value instanceof Instant + ? (Instant) value + : Instant.ofEpochMilli(((org.joda.time.Instant) value).getMillis()); seconds = timestamp.getEpochSecond(); picoseconds = timestamp.getNano() * 1000L; } else {