diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index e1ff0f58f148..46a014f8196b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -102,9 +102,15 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy // boolean return SchemaBuilder.builder().booleanType(); case "TIMESTAMP": - // in Extract Jobs, it always uses the Avro logical type - // we may have to change this if we move to EXPORT DATA - return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType()); + if (schema.getTimestampPrecision() == null || schema.getTimestampPrecision() == 6) { + // in Extract Jobs, it always uses the Avro logical type + // we may have to change this if we move to EXPORT DATA + return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType()); + } + return SchemaBuilder.builder() + .longBuilder() + .prop("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE) + .endLong(); case "DATE": if (useAvroLogicalTypes) { return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java index 6d155185ee62..07b6adf46bcd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java @@ -408,6 +408,10 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() { .name("ts_nanos") .type(longSchema) .noDefault() + .name("ts_picos") + .type() + .stringType() + .noDefault() .endRecord(); } @@ -421,12 +425,12 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() { public void testWriteGenericRecordTimestampNanos() throws Exception { String tableSpec = String.format("%s:%s.%s", project, DATASET_ID, "generic_record_ts_nanos_test"); - // Create GenericRecord with timestamp-nanos value GenericRecord record = new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA) .set( "ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000L + TEST_INSTANT.getNano()) + .set("ts_picos", "2024-01-15T10:30:45.123456789123Z") .build(); // Write using Storage Write API with Avro format @@ -437,7 +441,6 @@ public void testWriteGenericRecordTimestampNanos() throws Exception { "WriteGenericRecords", BigQueryIO.writeGenericRecords() .to(tableSpec) - .withAvroSchemaFactory(tableSchema -> TIMESTAMP_NANOS_AVRO_SCHEMA) .withSchema(BigQueryUtils.fromGenericAvroSchema(TIMESTAMP_NANOS_AVRO_SCHEMA, true)) .useAvroLogicalTypes() .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) @@ -457,12 +460,18 @@ public void testWriteGenericRecordTimestampNanos() throws Exception { .from(tableSpec)); PAssert.that(result) - .containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")); + .containsInAnyOrder( + new TableRow() + .set("ts_nanos", "2024-01-15T10:30:45.123456789000Z") + .set("ts_picos", "2024-01-15T10:30:45.123456789123Z")); readPipeline.run().waitUntilFinish(); } private static final Schema BEAM_TIMESTAMP_NANOS_SCHEMA = - Schema.builder().addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)).build(); + Schema.builder() + .addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)) + .addField("ts_picos", Schema.FieldType.STRING) + .build(); @Test public void testWriteBeamRowTimestampNanos() throws Exception { @@ -472,6 +481,7 @@ public void testWriteBeamRowTimestampNanos() throws Exception { Row row = Row.withSchema(BEAM_TIMESTAMP_NANOS_SCHEMA) .withFieldValue("ts_nanos", TEST_INSTANT) + .withFieldValue("ts_picos", "2024-01-15T10:30:45.123456789123Z") .build(); // Write using Storage Write API with Beam Schema @@ -500,7 +510,10 @@ public void testWriteBeamRowTimestampNanos() throws Exception { .from(tableSpec)); PAssert.that(result) - .containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")); + .containsInAnyOrder( + new TableRow() + .set("ts_nanos", "2024-01-15T10:30:45.123456789000Z") + .set("ts_picos", "2024-01-15T10:30:45.123456789123Z")); readPipeline.run().waitUntilFinish(); }