Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,15 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
// boolean
return SchemaBuilder.builder().booleanType();
case "TIMESTAMP":
// in Extract Jobs, it always uses the Avro logical type
// we may have to change this if we move to EXPORT DATA
return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
if (schema.getTimestampPrecision() == null || schema.getTimestampPrecision() == 6) {
// in Extract Jobs, it always uses the Avro logical type
// we may have to change this if we move to EXPORT DATA
return LogicalTypes.timestampMicros().addToSchema(SchemaBuilder.builder().longType());
}
return SchemaBuilder.builder()
.longBuilder()
.prop("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE)
.endLong();
case "DATE":
if (useAvroLogicalTypes) {
return LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() {
.name("ts_nanos")
.type(longSchema)
.noDefault()
.name("ts_picos")
.type()
.stringType()
.noDefault()
.endRecord();
}

Expand All @@ -421,12 +425,12 @@ private static org.apache.avro.Schema createTimestampNanosAvroSchema() {
public void testWriteGenericRecordTimestampNanos() throws Exception {
String tableSpec =
String.format("%s:%s.%s", project, DATASET_ID, "generic_record_ts_nanos_test");

// Create GenericRecord with timestamp-nanos value
GenericRecord record =
new GenericRecordBuilder(TIMESTAMP_NANOS_AVRO_SCHEMA)
.set(
"ts_nanos", TEST_INSTANT.getEpochSecond() * 1_000_000_000L + TEST_INSTANT.getNano())
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z")
.build();

// Write using Storage Write API with Avro format
Expand All @@ -437,7 +441,6 @@ public void testWriteGenericRecordTimestampNanos() throws Exception {
"WriteGenericRecords",
BigQueryIO.writeGenericRecords()
.to(tableSpec)
.withAvroSchemaFactory(tableSchema -> TIMESTAMP_NANOS_AVRO_SCHEMA)
.withSchema(BigQueryUtils.fromGenericAvroSchema(TIMESTAMP_NANOS_AVRO_SCHEMA, true))
.useAvroLogicalTypes()
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
Expand All @@ -457,12 +460,18 @@ public void testWriteGenericRecordTimestampNanos() throws Exception {
.from(tableSpec));

PAssert.that(result)
.containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z"));
.containsInAnyOrder(
new TableRow()
.set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
readPipeline.run().waitUntilFinish();
}

private static final Schema BEAM_TIMESTAMP_NANOS_SCHEMA =
Schema.builder().addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS)).build();
Schema.builder()
.addField("ts_nanos", Schema.FieldType.logicalType(Timestamp.NANOS))
.addField("ts_picos", Schema.FieldType.STRING)
.build();

@Test
public void testWriteBeamRowTimestampNanos() throws Exception {
Expand All @@ -472,6 +481,7 @@ public void testWriteBeamRowTimestampNanos() throws Exception {
Row row =
Row.withSchema(BEAM_TIMESTAMP_NANOS_SCHEMA)
.withFieldValue("ts_nanos", TEST_INSTANT)
.withFieldValue("ts_picos", "2024-01-15T10:30:45.123456789123Z")
.build();

// Write using Storage Write API with Beam Schema
Expand Down Expand Up @@ -500,7 +510,10 @@ public void testWriteBeamRowTimestampNanos() throws Exception {
.from(tableSpec));

PAssert.that(result)
.containsInAnyOrder(new TableRow().set("ts_nanos", "2024-01-15T10:30:45.123456789000Z"));
.containsInAnyOrder(
new TableRow()
.set("ts_nanos", "2024-01-15T10:30:45.123456789000Z")
.set("ts_picos", "2024-01-15T10:30:45.123456789123Z"));
readPipeline.run().waitUntilFinish();
}

Expand Down
Loading