diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index 76174ac3d04d..35751e2758e1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -348,6 +348,9 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch fieldDescriptorFromAvroField( new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal())); builder = builder.setType(elementFieldSchema.getType()); + if (elementFieldSchema.hasTimestampPrecision()) { + builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision()); + } builder.addAllFields(elementFieldSchema.getFieldsList()); builder = builder.setMode(TableFieldSchema.Mode.REPEATED); break; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java index adb8e4468c00..d940ff8dd7fc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java @@ -237,6 +237,9 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) { TableFieldSchema elementFieldSchema = fieldDescriptorFromBeamField(Field.of(field.getName(), elementType)); builder = builder.setType(elementFieldSchema.getType()); + if (elementFieldSchema.hasTimestampPrecision()) { + builder = builder.setTimestampPrecision(elementFieldSchema.getTimestampPrecision()); + } builder.addAllFields(elementFieldSchema.getFieldsList()); builder = builder.setMode(TableFieldSchema.Mode.REPEATED); break; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index e1ff0f58f148..46a014f8196b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -102,9 +102,15 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy // boolean return SchemaBuilder.builder().booleanType(); case "TIMESTAMP": - // in Extract Jobs, it always uses the Avro logical type - // we may have to change this if we move to EXPORT DATA - return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType()); + if (schema.getTimestampPrecision() == null || schema.getTimestampPrecision() == 6) { + // in Extract Jobs, it always uses the Avro logical type + // we may have to change this if we move to EXPORT DATA + return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType()); + } + return SchemaBuilder.builder() + .longBuilder() + .prop("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE) + .endLong(); case "DATE": if (useAvroLogicalTypes) { return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java index deabb1dd05fc..9698aaff1d73 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -348,7 +348,23 @@ private static Schema createTimestampNanosSchema() { .endRecord(); } + private static Schema createRepeatedTimestampNanosSchema() { + Schema longSchema = Schema.create(Schema.Type.LONG); + longSchema.addProp("logicalType", "timestamp-nanos"); + + Schema arraySchema = Schema.createArray(longSchema); + + return SchemaBuilder.record("RepeatedTimestampNanosRecord") + .fields() + .name("timestampNanosArray") + .type(arraySchema) + .noDefault() + .endRecord(); + } + private static final Schema TIMESTAMP_NANOS_SCHEMA = createTimestampNanosSchema(); + private static final Schema REPEATED_TIMESTAMP_NANOS_SCHEMA = + createRepeatedTimestampNanosSchema(); private static GenericRecord baseRecord; private static GenericRecord rawLogicalTypesRecord; @@ -885,4 +901,22 @@ public void testProtoTableSchemaFromAvroSchemaTimestampNanos() { assertTrue(field.hasTimestampPrecision()); assertEquals(12L, field.getTimestampPrecision().getValue()); } + + @Test + public void testProtoTableSchemaFromAvroSchemaRepeatedTimestampNanos() { + com.google.cloud.bigquery.storage.v1.TableSchema protoSchema = + AvroGenericRecordToStorageApiProto.protoTableSchemaFromAvroSchema( + REPEATED_TIMESTAMP_NANOS_SCHEMA); + + assertEquals(1, protoSchema.getFieldsCount()); + com.google.cloud.bigquery.storage.v1.TableFieldSchema field = protoSchema.getFields(0); + + assertEquals("timestampnanosarray", field.getName()); + assertEquals( + com.google.cloud.bigquery.storage.v1.TableFieldSchema.Type.TIMESTAMP, field.getType()); + assertEquals( + com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED, field.getMode()); + + assertEquals(12L, field.getTimestampPrecision().getValue()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java index d7a88615a50b..c546a7ca5d77 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java @@ -69,6 +69,10 @@ public class BeamRowToStorageApiProtoTest { Schema.builder() .addField("timestampNanos", FieldType.logicalType(Timestamp.NANOS).withNullable(true)) .build(); + private static final Schema TIMESTAMP_NANOS_ARRAY_SCHEMA = + Schema.builder() + .addField("timestampNanosArray", FieldType.array(FieldType.logicalType(Timestamp.NANOS))) + .build(); private static final EnumerationType TEST_ENUM = EnumerationType.create("ONE", "TWO", "RED", "BLUE"); private static final Schema BASE_SCHEMA = @@ -614,6 +618,19 @@ public void testTimestampNanosSchema() { assertEquals(12L, field.getTimestampPrecision().getValue()); } + @Test + public void testTimestampNanosArraySchema() { + com.google.cloud.bigquery.storage.v1.TableSchema protoSchema = + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(TIMESTAMP_NANOS_ARRAY_SCHEMA); + + assertEquals(1, protoSchema.getFieldsCount()); + TableFieldSchema field = protoSchema.getFields(0); + assertEquals(TableFieldSchema.Type.TIMESTAMP, field.getType()); + assertEquals( + com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode.REPEATED, field.getMode()); + assertEquals(12L, field.getTimestampPrecision().getValue()); + } + @Test public void testTimestampNanosDescriptor() throws Exception { DescriptorProto descriptor = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java index 6d155185ee62..07b6adf46bcd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTimestampPicosIT.java @@ -408,6 +408,10 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() { .name("ts_nanos") .type(longSchema) .noDefault() + .name("ts_picos") + .type() + .stringType() + .noDefault() .endRecord(); } @@ -421,12 +425,12 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() { public void testWriteGenericRecordTimestampNanos() throws Exception { String tableSpec = String.format("%s:%s.%s", project, DATASET_ID, "generic_record_ts_nanos_test"); - // Create GenericRecord with timestamp-nanos value GenericRecord record = new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA) .set( "ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000L + TEST_INSTANT.getNano()) + .set("ts_picos", "2024-01-15T10:30:45.123456789123Z") .build(); // Write using Storage Write API with Avro format @@ -437,7 +441,6 @@ public void testWriteGenericRecordTimestampNanos() throws Exception { "WriteGenericRecords", BigQueryIO.writeGenericRecords() .to(tableSpec) - .withAvroSchemaFactory(tableSchema -> TIMESTAMP_NANOS_AVRO_SCHEMA) .withSchema(BigQueryUtils.fromGenericAvroSchema(TIMESTAMP_NANOS_AVRO_SCHEMA, true)) .useAvroLogicalTypes() .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) @@ -457,12 +460,18 @@ public void testWriteGenericRecordTimestampNanos() throws Exception { .from(tableSpec)); PAssert.that(result) - .containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")); + .containsInAnyOrder( + new TableRow() + .set("ts_nanos", "2024-01-15T10:30:45.123456789000Z") + .set("ts_picos", "2024-01-15T10:30:45.123456789123Z")); readPipeline.run().waitUntilFinish(); } private static final Schema BEAM_TIMESTAMP_NANOS_SCHEMA = - Schema.builder().addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)).build(); + Schema.builder() + .addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)) + .addField("ts_picos", Schema.FieldType.STRING) + .build(); @Test public void testWriteBeamRowTimestampNanos() throws Exception { @@ -472,6 +481,7 @@ public void testWriteBeamRowTimestampNanos() throws Exception { Row row = Row.withSchema(BEAM_TIMESTAMP_NANOS_SCHEMA) .withFieldValue("ts_nanos", TEST_INSTANT) + .withFieldValue("ts_picos", "2024-01-15T10:30:45.123456789123Z") .build(); // Write using Storage Write API with Beam Schema @@ -500,7 +510,10 @@ public void testWriteBeamRowTimestampNanos() throws Exception { .from(tableSpec)); PAssert.that(result) - .containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")); + .containsInAnyOrder( + new TableRow() + .set("ts_nanos", "2024-01-15T10:30:45.123456789000Z") + .set("ts_picos", "2024-01-15T10:30:45.123456789123Z")); readPipeline.run().waitUntilFinish(); }