Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyG
exclude '**/BigQueryIODynamicQueryIT.class'
exclude '**/BigQueryIODynamicReadIT.class'
exclude '**/BigQueryIODynamicReadTableRowIT.class'
exclude '**/BigQueryTimestampPicosIT.java'
exclude '**/PubsubReadIT.class'
exclude '**/FhirIOReadIT.class'
exclude '**/DicomIOReadIT.class'
Expand Down Expand Up @@ -698,6 +699,7 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) {
exclude '**/BigQueryIODynamicQueryIT.class'
exclude '**/BigQueryIODynamicReadIT.class'
exclude '**/BigQueryIODynamicReadTableRowIT.class'
exclude '**/BigQueryTimestampPicosIT.java'
exclude '**/SpannerWriteIT.class'
exclude '**/*KmsKeyIT.class'
exclude '**/FhirIOReadIT.class'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public class BigQueryUtils {
+ "(?<DATASET>[a-zA-Z0-9_]{1,1024})[\\.]"
+ "(?<TABLE>[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}\\p{Zs}$]{1,1024})$");

private static final long PICOSECOND_PRECISION = 12L;

/** Options for how to convert BigQuery data to Beam data. */
@AutoValue
public abstract static class ConversionOptions implements Serializable {
Expand Down Expand Up @@ -380,6 +382,67 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
return ret;
}

/**
* Represents a timestamp with picosecond precision, split into seconds and picoseconds
* components.
*/
public static class TimestampPicos {
final long seconds;
final long picoseconds;

TimestampPicos(long seconds, long picoseconds) {
this.seconds = seconds;
this.picoseconds = picoseconds;
}

/**
* Parses a timestamp string into seconds and picoseconds components.
*
* <p>Handles two formats:
*
* <ul>
* <li>ISO format with exactly 12 fractional digits ending in Z (picosecond precision): e.g.,
* "2024-01-15T10:30:45.123456789012Z"
* <li>UTC format with 0-9 fractional digits ending in "UTC" (up to nanosecond precision):
* e.g., "2024-01-15 10:30:45.123456789 UTC", "2024-01-15 10:30:45 UTC"
* </ul>
*/
public static TimestampPicos fromString(String timestampString) {
// Check for ISO picosecond format up to 12 fractional digits before Z
// Format: "2024-01-15T10:30:45.123456789012Z"
if (timestampString.endsWith("Z")) {
int dotIndex = timestampString.lastIndexOf('.');

if (dotIndex > 0) {
String fractionalPart =
timestampString.substring(dotIndex + 1, timestampString.length() - 1);

if ((long) fractionalPart.length() == PICOSECOND_PRECISION) {
// ISO timestamp with 12 decimal digits (picosecond precision)
// Parse the datetime part (without fractional seconds)
String dateTimePart = timestampString.substring(0, dotIndex) + "Z";
java.time.Instant baseInstant = java.time.Instant.parse(dateTimePart);

// Parse all 12 digits directly as picoseconds (subsecond portion)
long picoseconds = Long.parseLong(fractionalPart);

return new TimestampPicos(baseInstant.getEpochSecond(), picoseconds);
}
}

// ISO format with 0-9 fractional digits - Instant.parse handles this
java.time.Instant timestamp = java.time.Instant.parse(timestampString);
return new TimestampPicos(timestamp.getEpochSecond(), timestamp.getNano() * 1000L);
}

// UTC format: "2024-01-15 10:30:45.123456789 UTC"
// Use TIMESTAMP_FORMATTER which handles space separator and "UTC" suffix
java.time.Instant timestamp =
java.time.Instant.from(TIMESTAMP_FORMATTER.parse(timestampString));
return new TimestampPicos(timestamp.getEpochSecond(), timestamp.getNano() * 1000L);
}
}

/**
* Get the Beam {@link FieldType} from a BigQuery type name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.TimestampPicos;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions;
Expand Down Expand Up @@ -191,6 +192,23 @@ private static String getPrettyFieldName(String fullName) {
.put(TableFieldSchema.Type.JSON, "JSON")
.build();

static final DescriptorProto TIMESTAMP_PICOS_DESCRIPTOR_PROTO =
DescriptorProto.newBuilder()
.setName("TimestampPicos")
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("seconds")
.setNumber(1)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
.build())
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName("picoseconds")
.setNumber(2)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
.build())
.build();

@FunctionalInterface
public interface ThrowingBiFunction<FirstInputT, SecondInputT, OutputT> {
OutputT apply(FirstInputT t, SecondInputT u) throws SchemaConversionException;
Expand All @@ -199,6 +217,8 @@ public interface ThrowingBiFunction<FirstInputT, SecondInputT, OutputT> {
static final DecimalFormat DECIMAL_FORMAT =
new DecimalFormat("0.0###############", DecimalFormatSymbols.getInstance(Locale.ROOT));

private static final long PICOSECOND_PRECISION = 12L;

// Map of functions to convert json values into the value expected in the Vortex proto object.
static final Map<TableFieldSchema.Type, ThrowingBiFunction<String, Object, @Nullable Object>>
TYPE_MAP_PROTO_CONVERTERS =
Expand Down Expand Up @@ -533,6 +553,9 @@ public static TableFieldSchema tableFieldToProtoTableField(
if (field.getScale() != null) {
builder.setScale(field.getScale());
}
if (field.getTimestampPrecision() != null) {
builder.getTimestampPrecisionBuilder().setValue(field.getTimestampPrecision());
}
builder.setType(typeToProtoType(field.getType()));
if (builder.getType().equals(TableFieldSchema.Type.STRUCT)) {
for (com.google.api.services.bigquery.model.TableFieldSchema subField : field.getFields()) {
Expand Down Expand Up @@ -587,6 +610,10 @@ public boolean isRepeated() {
return tableFieldSchema.getMode().equals(TableFieldSchema.Mode.REPEATED);
}

public long getTimestampPrecision() {
return tableFieldSchema.getTimestampPrecision().getValue();
}

public SchemaInformation getSchemaForField(String name) {
SchemaInformation schemaInformation = subFieldsByName.get(name.toLowerCase());
if (schemaInformation == null) {
Expand Down Expand Up @@ -631,7 +658,6 @@ static SchemaInformation fromTableSchema(
.put(TableFieldSchema.Type.DATE, Type.TYPE_INT32)
.put(TableFieldSchema.Type.TIME, Type.TYPE_INT64)
.put(TableFieldSchema.Type.DATETIME, Type.TYPE_INT64)
.put(TableFieldSchema.Type.TIMESTAMP, Type.TYPE_INT64)
.put(TableFieldSchema.Type.JSON, Type.TYPE_STRING)
.build();

Expand Down Expand Up @@ -957,10 +983,16 @@ static TableFieldSchema tableFieldSchemaFromDescriptorField(FieldDescriptor fiel

switch (fieldDescriptor.getType()) {
case MESSAGE:
tableFieldSchemaBuilder = tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
TableSchema nestedTableField = tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
tableFieldSchemaBuilder =
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
if (fieldDescriptor.getMessageType().getName().equals("TimestampPicos")) {
tableFieldSchemaBuilder.setType(TableFieldSchema.Type.TIMESTAMP);
tableFieldSchemaBuilder.setPrecision(PICOSECOND_PRECISION);
} else {
tableFieldSchemaBuilder = tableFieldSchemaBuilder.setType(TableFieldSchema.Type.STRUCT);
TableSchema nestedTableField =
tableSchemaFromDescriptor(fieldDescriptor.getMessageType());
tableFieldSchemaBuilder =
tableFieldSchemaBuilder.addAllFields(nestedTableField.getFieldsList());
}
break;
default:
TableFieldSchema.Type type = PRIMITIVE_TYPES_PROTO_TO_BQ.get(fieldDescriptor.getType());
Expand Down Expand Up @@ -1060,6 +1092,25 @@ private static void fieldDescriptorFromTableField(
fieldDescriptorBuilder =
fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
break;
case TIMESTAMP:
if (fieldSchema.getTimestampPrecision().getValue() == PICOSECOND_PRECISION) {
boolean typeAlreadyExists =
descriptorBuilder.getNestedTypeList().stream()
.anyMatch(d -> TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName().equals(d.getName()));

if (!typeAlreadyExists) {
descriptorBuilder.addNestedType(TIMESTAMP_PICOS_DESCRIPTOR_PROTO);
}
fieldDescriptorBuilder =
fieldDescriptorBuilder
.setType(Type.TYPE_MESSAGE)
.setTypeName(TIMESTAMP_PICOS_DESCRIPTOR_PROTO.getName());
} else {
// Microsecond precision - use simple INT64
fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
}
break;

default:
@Nullable Type type = PRIMITIVE_TYPES_BQ_TO_PROTO.get(fieldSchema.getType());
if (type == null) {
Expand Down Expand Up @@ -1313,6 +1364,36 @@ public static ByteString mergeNewFields(
null,
null);
}
} else if (schemaInformation.getType() == TableFieldSchema.Type.TIMESTAMP
&& schemaInformation.getTimestampPrecision() == PICOSECOND_PRECISION) {

long seconds;
long picoseconds;

if (value instanceof String) {
TimestampPicos parsed = TimestampPicos.fromString((String) value);
seconds = parsed.seconds;
picoseconds = parsed.picoseconds;

} else if (value instanceof Instant || value instanceof org.joda.time.Instant) {
Instant timestamp =
value instanceof Instant
? (Instant) value
: Instant.ofEpochMilli(((org.joda.time.Instant) value).getMillis());
seconds = timestamp.getEpochSecond();
picoseconds = timestamp.getNano() * 1000L;
} else {
throw new IllegalArgumentException(
"Unsupported timestamp value type: " + value.getClass().getName());
}

converted =
DynamicMessage.newBuilder(fieldDescriptor.getMessageType())
.setField(fieldDescriptor.getMessageType().findFieldByName("seconds"), seconds)
.setField(
fieldDescriptor.getMessageType().findFieldByName("picoseconds"), picoseconds)
.build();

} else {
@Nullable
ThrowingBiFunction<String, Object, @Nullable Object> converter =
Expand Down Expand Up @@ -1633,13 +1714,28 @@ public static Object jsonValueFromMessageValue(
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
} else if (fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
Message message = (Message) fieldValue;
String messageName = fieldDescriptor.getMessageType().getName();
if (TIMESTAMP_VALUE_DESCRIPTOR_NAMES.contains(
fieldDescriptor.getMessageType().getName())) {
Descriptor descriptor = message.getDescriptorForType();
long seconds = (long) message.getField(descriptor.findFieldByName("seconds"));
int nanos = (int) message.getField(descriptor.findFieldByName("nanos"));
Instant instant = Instant.ofEpochSecond(seconds, nanos);
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
} else if (messageName.equals("TimestampPicos")) {
Descriptor descriptor = message.getDescriptorForType();
long seconds = (long) message.getField(descriptor.findFieldByName("seconds"));
long picoseconds = (long) message.getField(descriptor.findFieldByName("picoseconds"));

// Convert to ISO timestamp string with picoseconds
Instant instant = Instant.ofEpochSecond(seconds);
String baseTimestamp = instant.toString(); // "2024-01-15T10:30:45Z"

// Format picoseconds as 12-digit string
String picosPart = String.format("%012d", picoseconds);

// Insert before 'Z': "2024-01-15T10:30:45Z" → "2024-01-15T10:30:45.123456789012Z"
return baseTimestamp.replace("Z", "." + picosPart + "Z");
} else {
throw new RuntimeException(
"Not implemented yet " + fieldDescriptor.getMessageType().getFullName());
Expand Down
Loading
Loading