Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->


# [2.72.0] - Unreleased

## Highlights
Expand All @@ -65,7 +64,7 @@

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Add support for Datadog IO (Java) ([#37318](https://github.com/apache/beam/issues/37318)).

## New Features / Improvements

Expand Down Expand Up @@ -113,6 +112,7 @@

## Known Issues


# [2.70.0] - 2025-12-16

## Highlights
Expand Down Expand Up @@ -196,7 +196,7 @@ Now Beam has full support for Milvus integration including Milvus enrichment and

## Highlights

* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.
* (Python) Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.

## I/Os

Expand All @@ -212,7 +212,7 @@ Now Beam has full support for Milvus integration including Milvus enrichment and
Beam now supports data enrichment capabilities using SQL databases, with built-in support for:
- Managed PostgreSQL, MySQL, and Microsoft SQL Server instances on CloudSQL
- Unmanaged SQL database instances not hosted on CloudSQL (e.g., self-hosted or on-premises databases)
* [Python] Added the `ReactiveThrottler` and `ThrottlingSignaler` classes to streamline throttling behavior in DoFns, expose throttling mechanisms for users ([#35984](https://github.com/apache/beam/pull/35984))
* (Python) Added the `ReactiveThrottler` and `ThrottlingSignaler` classes to streamline throttling behavior in DoFns, expose throttling mechanisms for users ([#35984](https://github.com/apache/beam/pull/35984))
* Added a pipeline option to specify the processing timeout for a single element by any PTransform (Java/Python/Go) ([#35174](https://github.com/apache/beam/issues/35174)).
- When specified, the SDK harness automatically restarts if an element takes too long to process. Beam runner may then retry processing of the same work item.
- Use the `--element_processing_timeout_minutes` option to reduce the chance of having stalled pipelines due to unexpected cases of slow processing, where slowness might not happen again if processing of the same element is retried.
Expand Down Expand Up @@ -2351,4 +2351,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss

## Highlights

- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ class BeamModulePlugin implements Plugin<Project> {
activemq_mqtt : "org.apache.activemq:activemq-mqtt:$activemq_version",
args4j : "args4j:args4j:2.33",
auto_value_annotations : "com.google.auto.value:auto-value-annotations:$autovalue_version",
auto_value : "com.google.auto.value:auto-value:$autovalue_version",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see below, this isn't needed

// TODO: https://github.com/apache/beam/issues/34993 after stopping supporting Java 8
avro : "org.apache.avro:avro:1.11.4",
aws_java_sdk2_apache_client : "software.amazon.awssdk:apache-client:$aws_java_sdk2_version",
Expand Down
49 changes: 49 additions & 0 deletions sdks/java/io/datadog/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.datadog'
)

description = "Apache Beam :: SDKs :: Java :: IO :: Datadog"
ext.summary = "IO to read and write to Datadog."

dependencies {
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.google_http_client
implementation library.java.google_code_gson
implementation library.java.auto_value_annotations
annotationProcessor library.java.auto_value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should already be handled by BeamModulePlugin:

"com.google.auto.value:auto-value:$autovalue_version",

and not needed here

testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.jupiter_api
testRuntimeOnly library.java.jupiter_engine
testImplementation library.java.jupiter_params
testImplementation library.java.truth
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(path: ":sdks:java:io:common")
testImplementation group: 'org.mock-server', name: 'mockserver-client-java', version: '5.10.0'
testImplementation group: 'org.mock-server', name: 'mockserver-junit-rule', version: '5.10.0'
implementation library.java.google_http_client_apache_v2
implementation library.java.http_client
implementation library.java.http_core
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.datadog;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to use checkframework Nullable


/** A class for Datadog events. */
@AutoValue
public abstract class DatadogEvent {

public static Builder newBuilder() {
return new AutoValue_DatadogEvent.Builder();
}

@Nullable
public abstract String ddsource();

@Nullable
public abstract String ddtags();

@Nullable
public abstract String hostname();

@Nullable
public abstract String service();

@Nullable
public abstract String message();

/** A builder class for creating {@link DatadogEvent} objects. */
@AutoValue.Builder
public abstract static class Builder {

abstract Builder setDdsource(String source);

abstract Builder setDdtags(String tags);

abstract Builder setHostname(String hostname);

abstract Builder setService(String service);

abstract Builder setMessage(String message);

abstract String message();

abstract DatadogEvent autoBuild();

public Builder withSource(String source) {
checkNotNull(source, "withSource(source) called with null input.");

return setDdsource(source);
}

public Builder withTags(String tags) {
checkNotNull(tags, "withTags(tags) called with null input.");

return setDdtags(tags);
}

public Builder withHostname(String hostname) {
checkNotNull(hostname, "withHostname(hostname) called with null input.");

return setHostname(hostname);
}

public Builder withService(String service) {
checkNotNull(service, "withService(service) called with null input.");

return setService(service);
}

public Builder withMessage(String message) {
checkNotNull(message, "withMessage(message) called with null input.");

return setMessage(message);
}

public DatadogEvent build() {
checkNotNull(message(), "Message is required.");

return autoBuild();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.datadog;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.values.TypeDescriptor;

/** A {@link org.apache.beam.sdk.coders.Coder} for {@link DatadogEvent} objects. */
public class DatadogEventCoder extends AtomicCoder<DatadogEvent> {

private static final DatadogEventCoder DATADOG_EVENT_CODER = new DatadogEventCoder();

private static final TypeDescriptor<DatadogEvent> TYPE_DESCRIPTOR =
new TypeDescriptor<DatadogEvent>() {};
private static final StringUtf8Coder STRING_UTF_8_CODER = StringUtf8Coder.of();
private static final NullableCoder<String> STRING_NULLABLE_CODER =
NullableCoder.of(STRING_UTF_8_CODER);

public static DatadogEventCoder of() {
return DATADOG_EVENT_CODER;
}

@Override
public void encode(DatadogEvent value, OutputStream out) throws IOException {
STRING_NULLABLE_CODER.encode(value.ddsource(), out);
STRING_NULLABLE_CODER.encode(value.ddtags(), out);
STRING_NULLABLE_CODER.encode(value.hostname(), out);
STRING_NULLABLE_CODER.encode(value.service(), out);
STRING_NULLABLE_CODER.encode(value.message(), out);
}

@Override
public DatadogEvent decode(InputStream in) throws IOException {
DatadogEvent.Builder builder = DatadogEvent.newBuilder();

String source = STRING_NULLABLE_CODER.decode(in);
if (source != null) {
builder.withSource(source);
}

String tags = STRING_NULLABLE_CODER.decode(in);
if (tags != null) {
builder.withTags(tags);
}

String hostname = STRING_NULLABLE_CODER.decode(in);
if (hostname != null) {
builder.withHostname(hostname);
}

String service = STRING_NULLABLE_CODER.decode(in);
if (service != null) {
builder.withService(service);
}

String message = STRING_NULLABLE_CODER.decode(in);
if (message != null) {
builder.withMessage(message);
}

return builder.build();
}

@Override
public TypeDescriptor<DatadogEvent> getEncodedTypeDescriptor() {
return TYPE_DESCRIPTOR;
}

@Override
public void verifyDeterministic() throws NonDeterministicException {
throw new NonDeterministicException(
this, "DatadogEvent can hold arbitrary instances, which may be non-deterministic.");
}
}
Loading
Loading