diff --git a/CHANGES.md b/CHANGES.md index 1dd3908bf33a..f86a89fab4c0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -55,7 +55,6 @@ * ([#X](https://github.com/apache/beam/issues/X)). --> - # [2.72.0] - Unreleased ## Highlights @@ -65,7 +64,7 @@ ## I/Os -* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Add support for Datadog IO (Java) ([#37318](https://github.com/apache/beam/issues/37318)). ## New Features / Improvements @@ -113,6 +112,7 @@ ## Known Issues + # [2.70.0] - 2025-12-16 ## Highlights @@ -196,7 +196,7 @@ Now Beam has full support for Milvus integration including Milvus enrichment and ## Highlights -* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues. +* (Python) Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues. ## I/Os @@ -212,7 +212,7 @@ Now Beam has full support for Milvus integration including Milvus enrichment and Beam now supports data enrichment capabilities using SQL databases, with built-in support for: - Managed PostgreSQL, MySQL, and Microsoft SQL Server instances on CloudSQL - Unmanaged SQL database instances not hosted on CloudSQL (e.g., self-hosted or on-premises databases) -* [Python] Added the `ReactiveThrottler` and `ThrottlingSignaler` classes to streamline throttling behavior in DoFns, expose throttling mechanisms for users ([#35984](https://github.com/apache/beam/pull/35984)) +* (Python) Added the `ReactiveThrottler` and `ThrottlingSignaler` classes to streamline throttling behavior in DoFns, expose throttling mechanisms for users ([#35984](https://github.com/apache/beam/pull/35984)) * Added a pipeline option to specify the processing timeout for a single element by any PTransform (Java/Python/Go) ([#35174](https://github.com/apache/beam/issues/35174)). - When specified, the SDK harness automatically restarts if an element takes too long to process. Beam runner may then retry processing of the same work item. - Use the `--element_processing_timeout_minutes` option to reduce the chance of having stalled pipelines due to unexpected cases of slow processing, where slowness might not happen again if processing of the same element is retried. @@ -2351,4 +2351,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss ## Highlights -- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). +- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). \ No newline at end of file diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index fdbca15e0003..b5b44497ce76 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -679,6 +679,7 @@ class BeamModulePlugin implements Plugin { activemq_mqtt : "org.apache.activemq:activemq-mqtt:$activemq_version", args4j : "args4j:args4j:2.33", auto_value_annotations : "com.google.auto.value:auto-value-annotations:$autovalue_version", + auto_value : "com.google.auto.value:auto-value:$autovalue_version", // TODO: https://github.com/apache/beam/issues/34993 after stopping supporting Java 8 avro : "org.apache.avro:avro:1.11.4", aws_java_sdk2_apache_client : "software.amazon.awssdk:apache-client:$aws_java_sdk2_version", diff --git a/sdks/java/io/datadog/build.gradle b/sdks/java/io/datadog/build.gradle new file mode 100644 index 000000000000..41f5e5b7d14e --- /dev/null +++ b/sdks/java/io/datadog/build.gradle @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.io.datadog' +) + +description = "Apache Beam :: SDKs :: Java :: IO :: Datadog" +ext.summary = "IO to read and write to Datadog." + +dependencies { + implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.vendored_guava_32_1_2_jre + implementation library.java.joda_time + implementation library.java.slf4j_api + implementation library.java.google_http_client + implementation library.java.google_code_gson + implementation library.java.auto_value_annotations + annotationProcessor library.java.auto_value + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation library.java.jupiter_api + testRuntimeOnly library.java.jupiter_engine + testImplementation library.java.jupiter_params + testImplementation library.java.truth + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") + testImplementation project(path: ":sdks:java:io:common") + testImplementation group: 'org.mock-server', name: 'mockserver-client-java', version: '5.10.0' + testImplementation group: 'org.mock-server', name: 'mockserver-junit-rule', version: '5.10.0' + implementation library.java.google_http_client_apache_v2 + implementation library.java.http_client + implementation library.java.http_core +} diff --git a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java new file mode 100644 index 000000000000..02986cdc19d8 --- /dev/null +++ b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEvent.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; + +/** A class for Datadog events. */ +@AutoValue +public abstract class DatadogEvent { + + public static Builder newBuilder() { + return new AutoValue_DatadogEvent.Builder(); + } + + @Nullable + public abstract String ddsource(); + + @Nullable + public abstract String ddtags(); + + @Nullable + public abstract String hostname(); + + @Nullable + public abstract String service(); + + @Nullable + public abstract String message(); + + /** A builder class for creating {@link DatadogEvent} objects. */ + @AutoValue.Builder + public abstract static class Builder { + + abstract Builder setDdsource(String source); + + abstract Builder setDdtags(String tags); + + abstract Builder setHostname(String hostname); + + abstract Builder setService(String service); + + abstract Builder setMessage(String message); + + abstract String message(); + + abstract DatadogEvent autoBuild(); + + public Builder withSource(String source) { + checkNotNull(source, "withSource(source) called with null input."); + + return setDdsource(source); + } + + public Builder withTags(String tags) { + checkNotNull(tags, "withTags(tags) called with null input."); + + return setDdtags(tags); + } + + public Builder withHostname(String hostname) { + checkNotNull(hostname, "withHostname(hostname) called with null input."); + + return setHostname(hostname); + } + + public Builder withService(String service) { + checkNotNull(service, "withService(service) called with null input."); + + return setService(service); + } + + public Builder withMessage(String message) { + checkNotNull(message, "withMessage(message) called with null input."); + + return setMessage(message); + } + + public DatadogEvent build() { + checkNotNull(message(), "Message is required."); + + return autoBuild(); + } + } +} diff --git a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventCoder.java b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventCoder.java new file mode 100644 index 000000000000..4e5de996ef51 --- /dev/null +++ b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventCoder.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A {@link org.apache.beam.sdk.coders.Coder} for {@link DatadogEvent} objects. */ +public class DatadogEventCoder extends AtomicCoder { + + private static final DatadogEventCoder DATADOG_EVENT_CODER = new DatadogEventCoder(); + + private static final TypeDescriptor TYPE_DESCRIPTOR = + new TypeDescriptor() {}; + private static final StringUtf8Coder STRING_UTF_8_CODER = StringUtf8Coder.of(); + private static final NullableCoder STRING_NULLABLE_CODER = + NullableCoder.of(STRING_UTF_8_CODER); + + public static DatadogEventCoder of() { + return DATADOG_EVENT_CODER; + } + + @Override + public void encode(DatadogEvent value, OutputStream out) throws IOException { + STRING_NULLABLE_CODER.encode(value.ddsource(), out); + STRING_NULLABLE_CODER.encode(value.ddtags(), out); + STRING_NULLABLE_CODER.encode(value.hostname(), out); + STRING_NULLABLE_CODER.encode(value.service(), out); + STRING_NULLABLE_CODER.encode(value.message(), out); + } + + @Override + public DatadogEvent decode(InputStream in) throws IOException { + DatadogEvent.Builder builder = DatadogEvent.newBuilder(); + + String source = STRING_NULLABLE_CODER.decode(in); + if (source != null) { + builder.withSource(source); + } + + String tags = STRING_NULLABLE_CODER.decode(in); + if (tags != null) { + builder.withTags(tags); + } + + String hostname = STRING_NULLABLE_CODER.decode(in); + if (hostname != null) { + builder.withHostname(hostname); + } + + String service = STRING_NULLABLE_CODER.decode(in); + if (service != null) { + builder.withService(service); + } + + String message = STRING_NULLABLE_CODER.decode(in); + if (message != null) { + builder.withMessage(message); + } + + return builder.build(); + } + + @Override + public TypeDescriptor getEncodedTypeDescriptor() { + return TYPE_DESCRIPTOR; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException( + this, "DatadogEvent can hold arbitrary instances, which may be non-deterministic."); + } +} diff --git a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisher.java b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisher.java new file mode 100644 index 000000000000..00a106b2ded8 --- /dev/null +++ b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisher.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.http.ByteArrayContent; +import com.google.api.client.http.GZipEncoding; +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpBackOffIOExceptionHandler; +import com.google.api.client.http.HttpContent; +import com.google.api.client.http.HttpMediaType; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestFactory; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpUnsuccessfulResponseHandler; +import com.google.api.client.http.apache.v2.ApacheHttpTransport; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.ExponentialBackOff; +import com.google.api.client.util.Sleeper; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Set; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.http.client.config.CookieSpecs; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.conn.ssl.DefaultHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.ssl.SSLContextBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link DatadogEventPublisher} is a utility class that helps write {@link DatadogEvent}s to a + * Datadog Logs API endpoint. + */ +@AutoValue +public abstract class DatadogEventPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(DatadogEventPublisher.class); + + private static final int DEFAULT_MAX_CONNECTIONS = 1; + + @VisibleForTesting protected static final String DD_URL_PATH = "api/v2/logs"; + + private static final String DD_API_KEY_HEADER = "dd-api-key"; + + private static final String DD_ORIGIN_HEADER = "dd-evp-origin"; + private static final String DD_ORIGIN_DATAFLOW = "dataflow"; + + private static final HttpMediaType MEDIA_TYPE = + new HttpMediaType("application/json;charset=utf-8"); + + private static final String CONTENT_TYPE = + Joiner.on('/').join(MEDIA_TYPE.getType(), MEDIA_TYPE.getSubType()); + + private static final String HTTPS_PROTOCOL_PREFIX = "https"; + + public static Builder newBuilder() { + return new AutoValue_DatadogEventPublisher.Builder() + .withMaxElapsedMillis(ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS); + } + + abstract ApacheHttpTransport transport(); + + abstract HttpRequestFactory requestFactory(); + + abstract GenericUrl genericUrl(); + + abstract String apiKey(); + + abstract Integer maxElapsedMillis(); + + /** + * Executes a POST for the list of {@link DatadogEvent} objects into Datadog's Logs API. + * + * @param events List of {@link DatadogEvent}s + * @return {@link HttpResponse} for the POST. + */ + public HttpResponse execute(List events) throws IOException { + + HttpContent content = getContent(events); + HttpRequest request = requestFactory().buildPostRequest(genericUrl(), content); + + request.setEncoding(new GZipEncoding()); + request.setUnsuccessfulResponseHandler( + new HttpSendLogsUnsuccessfulResponseHandler(getConfiguredBackOff())); + request.setIOExceptionHandler(new HttpBackOffIOExceptionHandler(getConfiguredBackOff())); + + setHeaders(request, apiKey()); + + return request.execute(); + } + + /** + * Same as {@link DatadogEventPublisher#execute(List)} but with a single {@link DatadogEvent}. + * + * @param event {@link DatadogEvent} object. + */ + public HttpResponse execute(DatadogEvent event) throws IOException { + return this.execute(ImmutableList.of(event)); + } + + /** + * Return an {@link ExponentialBackOff} with the right settings. + * + * @return {@link ExponentialBackOff} object. + */ + @VisibleForTesting + protected ExponentialBackOff getConfiguredBackOff() { + return new ExponentialBackOff.Builder().setMaxElapsedTimeMillis(maxElapsedMillis()).build(); + } + + /** Shutdown connection manager and releases all resources. */ + public void close() throws IOException { + if (transport() != null) { + LOG.info("Closing publisher transport."); + transport().shutdown(); + } + } + + /** + * Utility method to set http headers into the {@link HttpRequest}. + * + * @param request {@link HttpRequest} object to add headers to. + * @param apiKey Datadog's Logs API key. + */ + private void setHeaders(HttpRequest request, String apiKey) { + request.getHeaders().set(DD_API_KEY_HEADER, apiKey); + request.getHeaders().set(DD_ORIGIN_HEADER, DD_ORIGIN_DATAFLOW); + request.getHeaders().setContentEncoding("gzip"); + } + + /** + * Utility method to marshall a list of {@link DatadogEvent}s into an {@link HttpContent} object + * that can be used to create an {@link HttpRequest}. + * + * @param events List of {@link DatadogEvent}s + * @return {@link HttpContent} that can be used to create an {@link HttpRequest}. + */ + @VisibleForTesting + protected HttpContent getContent(List events) { + String payload = DatadogEventSerializer.getPayloadString(events); + LOG.debug("Payload content: {}", payload); + return ByteArrayContent.fromString(CONTENT_TYPE, payload); + } + + static class HttpSendLogsUnsuccessfulResponseHandler implements HttpUnsuccessfulResponseHandler { + /* + See: https://docs.datadoghq.com/api/latest/logs/#send-logs + 408: Request Timeout, request should be retried after some time + 429: Too Many Requests, request should be retried after some time + */ + private static final Set RETRYABLE_4XX_CODES = ImmutableSet.of(408, 429); + + private final Sleeper sleeper = Sleeper.DEFAULT; + private final BackOff backOff; + + HttpSendLogsUnsuccessfulResponseHandler(BackOff backOff) { + this.backOff = Preconditions.checkNotNull(backOff); + } + + @Override + public boolean handleResponse(HttpRequest req, HttpResponse res, boolean supportsRetry) + throws IOException { + if (!supportsRetry) { + return false; + } + + boolean is5xxStatusCode = res.getStatusCode() / 100 == 5; + boolean isRetryable4xxStatusCode = RETRYABLE_4XX_CODES.contains(res.getStatusCode()); + if (is5xxStatusCode || isRetryable4xxStatusCode) { + try { + return BackOffUtils.next(sleeper, backOff); + } catch (InterruptedException exception) { + // Mark thread as interrupted since we cannot throw InterruptedException here. + Thread.currentThread().interrupt(); + } + } + return false; + } + } + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setTransport(ApacheHttpTransport transport); + + abstract ApacheHttpTransport transport(); + + abstract Builder setRequestFactory(HttpRequestFactory requestFactory); + + abstract HttpRequestFactory requestFactory(); + + abstract Builder setGenericUrl(GenericUrl genericUrl); + + abstract GenericUrl genericUrl(); + + abstract Builder setApiKey(String apiKey); + + abstract String apiKey(); + + abstract Builder setMaxElapsedMillis(Integer maxElapsedMillis); + + abstract Integer maxElapsedMillis(); + + abstract DatadogEventPublisher autoBuild(); + + /** + * Method to set the Datadog Logs API URL. + * + * @param url Logs API URL + * @return {@link Builder} + */ + public Builder withUrl(String url) throws UnsupportedEncodingException { + checkNotNull(url, "withUrl(url) called with null input."); + return setGenericUrl(getGenericUrl(url)); + } + + /** + * Method to set the Datadog Logs API key. + * + * @param apiKey Logs API key. + * @return {@link Builder} + */ + public Builder withApiKey(String apiKey) { + checkNotNull(apiKey, "withApiKey(apiKey) called with null input."); + return setApiKey(apiKey); + } + + /** + * Method to max timeout for {@link ExponentialBackOff}. Otherwise uses the default setting for + * {@link ExponentialBackOff}. + * + * @param maxElapsedMillis max elapsed time in milliseconds for timeout. + * @return {@link Builder} + */ + public Builder withMaxElapsedMillis(Integer maxElapsedMillis) { + checkNotNull( + maxElapsedMillis, "withMaxElapsedMillis(maxElapsedMillis) called with null input."); + return setMaxElapsedMillis(maxElapsedMillis); + } + + /** + * Validates and builds a {@link DatadogEventPublisher} object. + * + * @return {@link DatadogEventPublisher} + */ + public DatadogEventPublisher build() throws NoSuchAlgorithmException, KeyManagementException { + + checkNotNull(apiKey(), "API Key needs to be specified via withApiKey(apiKey)."); + checkNotNull(genericUrl(), "URL needs to be specified via withUrl(url)."); + + CloseableHttpClient httpClient = getHttpClient(DEFAULT_MAX_CONNECTIONS); + + setTransport(new ApacheHttpTransport(httpClient)); + setRequestFactory(transport().createRequestFactory()); + + return autoBuild(); + } + + /** + * Utility method to convert a baseUrl into a {@link GenericUrl}. + * + * @param baseUrl url pointing to the Logs API endpoint. + * @return {@link GenericUrl} + */ + private GenericUrl getGenericUrl(String baseUrl) { + String url = Joiner.on('/').join(baseUrl, DD_URL_PATH); + + return new GenericUrl(url); + } + + /** + * Utility method to create a {@link CloseableHttpClient} to make http POSTs against Datadog's + * Logs API. + */ + private CloseableHttpClient getHttpClient(int maxConnections) + throws NoSuchAlgorithmException, KeyManagementException { + + HttpClientBuilder builder = ApacheHttpTransport.newDefaultHttpClientBuilder(); + + if (genericUrl().getScheme().equalsIgnoreCase(HTTPS_PROTOCOL_PREFIX)) { + LOG.info("SSL connection requested"); + + HostnameVerifier hostnameVerifier = new DefaultHostnameVerifier(); + + SSLContext sslContext = SSLContextBuilder.create().build(); + + SSLConnectionSocketFactory connectionSocketFactory = + new SSLConnectionSocketFactory(sslContext, hostnameVerifier); + builder.setSSLSocketFactory(connectionSocketFactory); + } + + builder.setMaxConnTotal(maxConnections); + builder.setDefaultRequestConfig( + RequestConfig.custom().setCookieSpec(CookieSpecs.STANDARD).build()); + + return builder.build(); + } + } +} diff --git a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializer.java b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializer.java new file mode 100644 index 000000000000..1a3886827291 --- /dev/null +++ b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class DatadogEventSerializer { + private static final Gson GSON = + new GsonBuilder().setFieldNamingStrategy(f -> f.getName().toLowerCase()).create(); + + private DatadogEventSerializer() {} + + /** Utility method to get payload string from a list of {@link DatadogEvent}s. */ + public static String getPayloadString(List events) { + return GSON.toJson(events); + } + + /** Utility method to get payload string from a {@link DatadogEvent}. */ + public static String getPayloadString(DatadogEvent event) { + return GSON.toJson(event); + } + + /** Utility method to get payload size from a string. */ + public static long getPayloadSize(String payload) { + return payload.getBytes(StandardCharsets.UTF_8).length; + } +} diff --git a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventWriter.java b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventWriter.java new file mode 100644 index 000000000000..6de3a1b86e2e --- /dev/null +++ b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogEventWriter.java @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpResponseException; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InetAddresses; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.InternetDomainName; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** A {@link DoFn} to write {@link DatadogEvent}s to Datadog's Logs API. */ +@AutoValue +public abstract class DatadogEventWriter + extends DoFn, DatadogWriteError> { + + private static final Integer MIN_BATCH_COUNT = 10; + private static final Integer DEFAULT_BATCH_COUNT = 100; + private static final Integer MAX_BATCH_COUNT = 1000; + private static final Logger LOG = LoggerFactory.getLogger(DatadogEventWriter.class); + private static final long DEFAULT_FLUSH_DELAY = 2; + private static final Long MAX_BUFFER_SIZE = 5L * 1000 * 1000; // 5MB + private static final Counter INPUT_COUNTER = + Metrics.counter(DatadogEventWriter.class, "inbound-events"); + private static final Counter SUCCESS_WRITES = + Metrics.counter(DatadogEventWriter.class, "outbound-successful-events"); + private static final Counter FAILED_WRITES = + Metrics.counter(DatadogEventWriter.class, "outbound-failed-events"); + private static final Counter INVALID_REQUESTS = + Metrics.counter(DatadogEventWriter.class, "http-invalid-requests"); + private static final Counter SERVER_ERROR_REQUESTS = + Metrics.counter(DatadogEventWriter.class, "http-server-error-requests"); + private static final Counter VALID_REQUESTS = + Metrics.counter(DatadogEventWriter.class, "http-valid-requests"); + private static final Distribution SUCCESSFUL_WRITE_LATENCY_MS = + Metrics.distribution(DatadogEventWriter.class, "successful_write_to_datadog_latency_ms"); + private static final Distribution UNSUCCESSFUL_WRITE_LATENCY_MS = + Metrics.distribution(DatadogEventWriter.class, "unsuccessful_write_to_datadog_latency_ms"); + private static final Distribution SUCCESSFUL_WRITE_BATCH_SIZE = + Metrics.distribution(DatadogEventWriter.class, "write_to_datadog_batch"); + private static final Distribution SUCCESSFUL_WRITE_PAYLOAD_SIZE = + Metrics.distribution(DatadogEventWriter.class, "write_to_datadog_bytes"); + private static final String BUFFER_STATE_NAME = "buffer"; + private static final String COUNT_STATE_NAME = "count"; + private static final String BUFFER_SIZE_STATE_NAME = "buffer_size"; + private static final String TIME_ID_NAME = "expiry"; + private static final Pattern URL_PATTERN = Pattern.compile("^http(s?)://([^:]+)(:[0-9]+)?$"); + + @VisibleForTesting + protected static final String INVALID_URL_FORMAT_MESSAGE = + "Invalid url format. Url format should match PROTOCOL://HOST[:PORT], where PORT is optional. " + + "Supported Protocols are http and https. eg: http://hostname:8088"; + + @StateId(BUFFER_STATE_NAME) + private final StateSpec> buffer = StateSpecs.bag(); + + @StateId(COUNT_STATE_NAME) + private final StateSpec> count = StateSpecs.value(); + + @StateId(BUFFER_SIZE_STATE_NAME) + private final StateSpec> bufferSize = StateSpecs.value(); + + @TimerId(TIME_ID_NAME) + private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + private Integer batchCount; + private Long maxBufferSize; + @Nullable private transient DatadogEventPublisher publisher; + + DatadogEventWriter() { + this.batchCount = DEFAULT_BATCH_COUNT; + this.maxBufferSize = MAX_BUFFER_SIZE; + this.publisher = null; + } + + public static Builder newBuilder() { + return newBuilder(MIN_BATCH_COUNT); + } + + public static Builder newBuilder(@Nullable Integer minBatchCount) { + return new AutoValue_DatadogEventWriter.Builder() + .setMinBatchCount(MoreObjects.firstNonNull(minBatchCount, MIN_BATCH_COUNT)); + } + + @Nullable + abstract String url(); + + @Nullable + abstract String apiKey(); + + @Nullable + abstract Integer minBatchCount(); + + @Nullable + abstract Integer inputBatchCount(); + + @Nullable + abstract Long maxBufferSize(); + + @Setup + public void setup() { + + final String url = url(); + if (url == null) { + throw new IllegalArgumentException("url is required for writing events."); + } + checkArgument(isValidUrlFormat(url), INVALID_URL_FORMAT_MESSAGE); + final String apiKey = apiKey(); + if (apiKey == null) { + throw new IllegalArgumentException("API Key is required for writing events."); + } + + batchCount = MoreObjects.firstNonNull(inputBatchCount(), DEFAULT_BATCH_COUNT); + LOG.info("Batch count set to: {}", batchCount); + + maxBufferSize = MoreObjects.firstNonNull(maxBufferSize(), MAX_BUFFER_SIZE); + LOG.info("Max buffer size set to: {}", maxBufferSize); + + checkArgument( + batchCount >= MoreObjects.firstNonNull(minBatchCount(), MIN_BATCH_COUNT), + "batchCount must be greater than or equal to %s", + minBatchCount()); + checkArgument( + batchCount <= MAX_BATCH_COUNT, + "batchCount must be less than or equal to %s", + MAX_BATCH_COUNT); + + try { + DatadogEventPublisher.Builder builder = + DatadogEventPublisher.newBuilder().withUrl(url).withApiKey(apiKey); + + publisher = builder.build(); + } catch (IOException | NoSuchAlgorithmException | KeyManagementException e) { + LOG.error("Error creating HttpEventPublisher: ", e); + throw new RuntimeException(e); + } + } + + @ProcessElement + public void processElement( + @Element KV input, + OutputReceiver receiver, + BoundedWindow window, + @StateId(BUFFER_STATE_NAME) BagState bufferState, + @StateId(COUNT_STATE_NAME) ValueState countState, + @StateId(BUFFER_SIZE_STATE_NAME) ValueState bufferSizeState, + @TimerId(TIME_ID_NAME) Timer timer) + throws IOException { + + DatadogEvent event = input.getValue(); + INPUT_COUNTER.inc(); + + String eventPayload = DatadogEventSerializer.getPayloadString(event); + long eventPayloadSize = DatadogEventSerializer.getPayloadSize(eventPayload); + if (eventPayloadSize > maxBufferSize) { + LOG.error( + "Error processing event of size {} due to exceeding max buffer size", eventPayloadSize); + DatadogWriteError error = DatadogWriteError.newBuilder().withPayload(eventPayload).build(); + receiver.output(error); + return; + } + + timer.offset(Duration.standardSeconds(DEFAULT_FLUSH_DELAY)).setRelative(); + + long count = MoreObjects.firstNonNull(countState.read(), 0L); + long bufferSize = MoreObjects.firstNonNull(bufferSizeState.read(), 0L); + if (bufferSize + eventPayloadSize > maxBufferSize) { + LOG.debug("Flushing batch of {} events of size {} due to max buffer size", count, bufferSize); + flush(receiver, bufferState, countState, bufferSizeState); + + count = 0L; + bufferSize = 0L; + } + + bufferState.add(event); + + count = count + 1L; + countState.write(count); + + bufferSize = bufferSize + eventPayloadSize; + bufferSizeState.write(bufferSize); + + if (count >= batchCount) { + LOG.debug("Flushing batch of {} events of size {} due to batch count", count, bufferSize); + flush(receiver, bufferState, countState, bufferSizeState); + } + } + + @OnTimer(TIME_ID_NAME) + public void onExpiry( + OutputReceiver receiver, + @StateId(BUFFER_STATE_NAME) BagState bufferState, + @StateId(COUNT_STATE_NAME) ValueState countState, + @StateId(BUFFER_SIZE_STATE_NAME) ValueState bufferSizeState) + throws IOException { + + long count = MoreObjects.firstNonNull(countState.read(), 0L); + long bufferSize = MoreObjects.firstNonNull(bufferSizeState.read(), 0L); + + if (count > 0) { + LOG.debug("Flushing batch of {} events of size {} due to timer", count, bufferSize); + flush(receiver, bufferState, countState, bufferSizeState); + } + } + + @Teardown + public void tearDown() { + if (this.publisher != null) { + try { + this.publisher.close(); + LOG.info("Successfully closed HttpEventPublisher"); + + } catch (IOException e) { + LOG.warn("Received exception while closing HttpEventPublisher: ", e); + } + } + } + + /** + * Utility method to flush a batch of events via {@link DatadogEventPublisher}. + * + * @param receiver Receiver to write {@link DatadogWriteError}s to + */ + private void flush( + OutputReceiver receiver, + @StateId(BUFFER_STATE_NAME) BagState bufferState, + @StateId(COUNT_STATE_NAME) ValueState countState, + @StateId(BUFFER_SIZE_STATE_NAME) ValueState bufferSizeState) + throws IOException { + + if (!bufferState.isEmpty().read()) { + + long count = MoreObjects.firstNonNull(countState.read(), 0L); + long bufferSize = MoreObjects.firstNonNull(bufferSizeState.read(), 0L); + HttpResponse response = null; + List events = Lists.newArrayList(bufferState.read()); + long startTime = System.nanoTime(); + try { + // Important to close this response to avoid connection leak. + response = checkNotNull(publisher).execute(events); + if (!response.isSuccessStatusCode()) { + UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - startTime)); + FAILED_WRITES.inc(count); + int statusCode = response.getStatusCode(); + if (statusCode >= 400 && statusCode < 500) { + INVALID_REQUESTS.inc(); + } else if (statusCode >= 500 && statusCode < 600) { + SERVER_ERROR_REQUESTS.inc(); + } + + logWriteFailures( + count, + response.getStatusCode(), + response.parseAsString(), + response.getStatusMessage()); + flushWriteFailures( + events, response.getStatusMessage(), response.getStatusCode(), receiver); + + } else { + SUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - startTime)); + SUCCESS_WRITES.inc(count); + VALID_REQUESTS.inc(); + SUCCESSFUL_WRITE_BATCH_SIZE.update(count); + SUCCESSFUL_WRITE_PAYLOAD_SIZE.update(bufferSize); + + LOG.debug("Successfully wrote {} events", count); + } + + } catch (HttpResponseException e) { + UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - startTime)); + FAILED_WRITES.inc(count); + int statusCode = e.getStatusCode(); + if (statusCode >= 400 && statusCode < 500) { + INVALID_REQUESTS.inc(); + } else if (statusCode >= 500 && statusCode < 600) { + SERVER_ERROR_REQUESTS.inc(); + } + + logWriteFailures(count, e.getStatusCode(), e.getContent(), e.getStatusMessage()); + flushWriteFailures(events, e.getStatusMessage(), e.getStatusCode(), receiver); + + } catch (IOException ioe) { + UNSUCCESSFUL_WRITE_LATENCY_MS.update(nanosToMillis(System.nanoTime() - startTime)); + FAILED_WRITES.inc(count); + INVALID_REQUESTS.inc(); + + logWriteFailures(count, 0, ioe.getMessage(), null); + flushWriteFailures(events, ioe.getMessage(), null, receiver); + + } finally { + // States are cleared regardless of write success or failure since we + // write failed events to an output PCollection. + bufferState.clear(); + countState.clear(); + bufferSizeState.clear(); + + // We've observed cases where errors at this point can cause the pipeline to keep retrying + // the same events over and over (e.g. from Dataflow Runner's Pub/Sub implementation). Since + // the events have either been published or wrapped for error handling, we can safely + // ignore this error, though there may or may not be a leak of some type depending on + // HttpResponse's implementation. However, any potential leak would still happen if we let + // the exception fall through, so this isn't considered a major issue. + try { + if (response != null) { + response.ignore(); + } + } catch (IOException e) { + LOG.warn( + "Error ignoring response from Datadog. Messages should still have published, but there" + + " might be a connection leak.", + e); + } + } + } + } + + /** Utility method to log write failures. */ + private void logWriteFailures( + long count, int statusCode, @Nullable String content, @Nullable String statusMessage) { + LOG.error("Failed to write {} events", count); + LOG.error( + "Error writing to Datadog. StatusCode: {}, content: {}, StatusMessage: {}", + statusCode, + content, + statusMessage); + } + + /** + * Utility method to un-batch and flush failed write events. + * + * @param events List of {@link DatadogEvent}s to un-batch + * @param statusMessage Status message to be added to {@link DatadogWriteError} + * @param statusCode Status code to be added to {@link DatadogWriteError} + * @param receiver Receiver to write {@link DatadogWriteError}s to + */ + private void flushWriteFailures( + List events, + @Nullable String statusMessage, + @Nullable Integer statusCode, + OutputReceiver receiver) { + + checkNotNull(events, "DatadogEvents cannot be null."); + + DatadogWriteError.Builder builder = DatadogWriteError.newBuilder(); + + if (statusMessage != null) { + builder.withStatusMessage(statusMessage); + } + + if (statusCode != null) { + builder.withStatusCode(statusCode); + } + + for (DatadogEvent event : events) { + String payload = DatadogEventSerializer.getPayloadString(event); + DatadogWriteError error = builder.withPayload(payload).build(); + receiver.output(error); + } + } + + /** + * Checks whether the Logs API URL matches the format PROTOCOL://HOST[:PORT]. + * + * @param url for Logs API + * @return true if the URL is valid + */ + private static boolean isValidUrlFormat(@Nullable String url) { + if (url == null) { + return false; + } + Matcher matcher = URL_PATTERN.matcher(url); + if (matcher.find()) { + String host = matcher.group(2); + if (host == null) { + return false; + } + return InetAddresses.isInetAddress(host) || InternetDomainName.isValid(host); + } + return false; + } + + /** + * Converts Nanoseconds to Milliseconds. + * + * @param ns time in nanoseconds + * @return time in milliseconds + */ + private static long nanosToMillis(long ns) { + return Math.round(((double) ns) / 1e6); + } + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setUrl(String url); + + abstract String url(); + + abstract Builder setApiKey(String apiKey); + + abstract String apiKey(); + + abstract Builder setMinBatchCount(Integer minBatchCount); + + abstract Integer minBatchCount(); + + abstract Builder setInputBatchCount(@Nullable Integer inputBatchCount); + + abstract Builder setMaxBufferSize(Long maxBufferSize); + + abstract DatadogEventWriter autoBuild(); + + /** + * Method to set the url for Logs API. + * + * @param url for Logs API + * @return {@link Builder} + */ + public Builder withUrl(String url) { + checkArgument(url != null, "withURL(url) called with null input."); + checkArgument(isValidUrlFormat(url), INVALID_URL_FORMAT_MESSAGE); + return setUrl(url); + } + + /** + * Method to set the API key for Logs API. + * + * @param apiKey API key for Logs API + * @return {@link Builder} + */ + public Builder withApiKey(String apiKey) { + checkArgument(apiKey != null, "withApiKey(apiKey) called with null input."); + return setApiKey(apiKey); + } + + /** + * Method to set the inputBatchCount. + * + * @param inputBatchCount for batching post requests. + * @return {@link Builder} + */ + public Builder withInputBatchCount(@Nullable Integer inputBatchCount) { + if (inputBatchCount != null) { + checkArgument( + inputBatchCount >= MoreObjects.firstNonNull(minBatchCount(), MIN_BATCH_COUNT), + "inputBatchCount must be greater than or equal to %s", + minBatchCount()); + checkArgument( + inputBatchCount <= MAX_BATCH_COUNT, + "inputBatchCount must be less than or equal to %s", + MAX_BATCH_COUNT); + } + return setInputBatchCount(inputBatchCount); + } + + /** + * Method to set the maxBufferSize. + * + * @param maxBufferSize for batching post requests. + * @return {@link Builder} + */ + public Builder withMaxBufferSize(@Nullable Long maxBufferSize) { + if (maxBufferSize == null) { + return setMaxBufferSize(MAX_BUFFER_SIZE); + } + return setMaxBufferSize(maxBufferSize); + } + + /** Build a new {@link DatadogEventWriter} objects based on the configuration. */ + public DatadogEventWriter build() { + checkNotNull(url(), "url needs to be provided."); + checkNotNull(apiKey(), "apiKey needs to be provided."); + + return autoBuild(); + } + } +} diff --git a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogIO.java b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogIO.java new file mode 100644 index 000000000000..fa8b6befabad --- /dev/null +++ b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogIO.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link DatadogIO} class provides a {@link PTransform} that allows writing {@link + * DatadogEvent} messages into a Datadog Logs API end point. + */ +public class DatadogIO { + + private static final Logger LOG = LoggerFactory.getLogger(DatadogIO.class); + + private DatadogIO() {} + + public static Write.Builder writeBuilder() { + return writeBuilder(null); + } + + public static Write.Builder writeBuilder(@Nullable Integer minBatchCount) { + return new AutoValue_DatadogIO_Write.Builder().setMinBatchCount(minBatchCount); + } + + /** + * Class {@link Write} provides a {@link PTransform} that allows writing {@link DatadogEvent} + * records into a Datadog Logs API end-point using HTTP POST requests. In the event of an error, a + * {@link PCollection} of {@link DatadogWriteError} records are returned for further processing or + * storing into a deadletter sink. + */ + @AutoValue + public abstract static class Write + extends PTransform, PCollection> { + + abstract String url(); + + abstract String apiKey(); + + @Nullable + abstract Integer minBatchCount(); + + @Nullable + abstract Integer batchCount(); + + @Nullable + abstract Long maxBufferSize(); + + @Nullable + abstract Integer parallelism(); + + @Override + public PCollection expand(PCollection input) { + + LOG.info("Configuring DatadogEventWriter."); + DatadogEventWriter.Builder builder = + DatadogEventWriter.newBuilder(minBatchCount()) + .withMaxBufferSize(maxBufferSize()) + .withUrl(url()) + .withInputBatchCount(batchCount()) + .withApiKey(apiKey()); + + DatadogEventWriter writer = builder.build(); + LOG.info("DatadogEventWriter configured"); + + // Return a PCollection + return input + .apply("Create KV pairs", CreateKeys.of(parallelism())) + .apply("Write Datadog events", ParDo.of(writer)) + .setCoder(DatadogWriteErrorCoder.of()); + } + + /** A builder for creating {@link Write} objects. */ + @AutoValue.Builder + public abstract static class Builder { + + abstract Builder setUrl(String url); + + abstract String url(); + + abstract Builder setApiKey(String apiKey); + + abstract String apiKey(); + + abstract Builder setMinBatchCount(@Nullable Integer minBatchCount); + + abstract Builder setBatchCount(Integer batchCount); + + abstract Builder setMaxBufferSize(Long maxBufferSize); + + abstract Builder setParallelism(Integer parallelism); + + abstract Write autoBuild(); + + /** + * Method to set the url for Logs API. + * + * @param url for Logs API + * @return {@link Builder} + */ + public Builder withUrl(String url) { + checkArgument(url != null, "withURL(url) called with null input."); + return setUrl(url); + } + + /** + * Method to set the API key for Logs API. + * + * @param apiKey API key for Logs API + * @return {@link Builder} + */ + public Builder withApiKey(String apiKey) { + checkArgument(apiKey != null, "withApiKey(apiKey) called with null input."); + return setApiKey(apiKey); + } + + /** + * Method to set the Batch Count. + * + * @param batchCount for batching post requests. + * @return {@link Builder} + */ + public Builder withBatchCount(Integer batchCount) { + checkArgument(batchCount != null, "withBatchCount(batchCount) called with null input."); + return setBatchCount(batchCount); + } + + /** + * Method to set the Max Buffer Size. + * + * @param maxBufferSize for batching post requests. + * @return {@link Builder} + */ + public Builder withMaxBufferSize(Long maxBufferSize) { + checkArgument( + maxBufferSize != null, "withMaxBufferSize(maxBufferSize) called with null input."); + return setMaxBufferSize(maxBufferSize); + } + + /** + * Method to set the parallelism. + * + * @param parallelism for controlling the number of http client connections. + * @return {@link Builder} + */ + public Builder withParallelism(Integer parallelism) { + checkArgument(parallelism != null, "withParallelism(parallelism) called with null input."); + return setParallelism(parallelism); + } + + public Write build() { + checkNotNull(url(), "Logs API url is required."); + checkNotNull(apiKey(), "API key is required."); + + return autoBuild(); + } + } + + private static class CreateKeys + extends PTransform, PCollection>> { + + private static final Integer DEFAULT_PARALLELISM = 1; + + @Nullable private Integer requestedKeys; + + private CreateKeys(@Nullable Integer requestedKeys) { + this.requestedKeys = requestedKeys; + } + + static CreateKeys of(@Nullable Integer requestedKeys) { + return new CreateKeys(requestedKeys); + } + + @Override + public PCollection> expand(PCollection input) { + + return input + .apply("Inject Keys", ParDo.of(new CreateKeysFn(this.requestedKeys))) + .setCoder(KvCoder.of(BigEndianIntegerCoder.of(), DatadogEventCoder.of())); + } + + private static class CreateKeysFn extends DoFn> { + + @Nullable private Integer specifiedParallelism; + private Integer calculatedParallelism; + + CreateKeysFn(@Nullable Integer specifiedParallelism) { + this.specifiedParallelism = specifiedParallelism; + this.calculatedParallelism = + MoreObjects.firstNonNull(specifiedParallelism, DEFAULT_PARALLELISM); + LOG.info("Parallelism set to: {}", calculatedParallelism); + } + + @Setup + public void setup() { + // Initialization is now in the constructor to satisfy static analysis. + } + + @ProcessElement + public void processElement(ProcessContext context) { + context.output( + KV.of(ThreadLocalRandom.current().nextInt(calculatedParallelism), context.element())); + } + } + } + } +} diff --git a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteError.java b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteError.java new file mode 100644 index 000000000000..977873718c65 --- /dev/null +++ b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteError.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; + +/** A class for capturing errors writing {@link DatadogEvent}s to Datadog's Logs API. */ +@AutoValue +public abstract class DatadogWriteError { + + public static Builder newBuilder() { + return new AutoValue_DatadogWriteError.Builder(); + } + + @Nullable + public abstract Integer statusCode(); + + @Nullable + public abstract String statusMessage(); + + @Nullable + public abstract String payload(); + + @AutoValue.Builder + abstract static class Builder { + + abstract Builder setStatusCode(Integer statusCode); + + abstract Integer statusCode(); + + abstract Builder setStatusMessage(String statusMessage); + + abstract Builder setPayload(String payload); + + abstract DatadogWriteError autoBuild(); + + public Builder withStatusCode(Integer statusCode) { + checkNotNull(statusCode, "withStatusCode(statusCode) called with null input."); + + return setStatusCode(statusCode); + } + + public Builder withStatusMessage(String statusMessage) { + checkNotNull(statusMessage, "withStatusMessage(statusMessage) called with null input."); + + return setStatusMessage(statusMessage); + } + + public Builder withPayload(String payload) { + checkNotNull(payload, "withPayload(payload) called with null input."); + + return setPayload(payload); + } + + public DatadogWriteError build() { + return autoBuild(); + } + } +} diff --git a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoder.java b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoder.java new file mode 100644 index 000000000000..a634c798518d --- /dev/null +++ b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoder.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A {@link org.apache.beam.sdk.coders.Coder} for {@link DatadogWriteError} objects. */ +public class DatadogWriteErrorCoder extends AtomicCoder { + + private static final DatadogWriteErrorCoder DATADOG_WRITE_ERROR_CODER = + new DatadogWriteErrorCoder(); + + private static final TypeDescriptor TYPE_DESCRIPTOR = + new TypeDescriptor() {}; + private static final StringUtf8Coder STRING_UTF_8_CODER = StringUtf8Coder.of(); + private static final NullableCoder STRING_NULLABLE_CODER = + NullableCoder.of(STRING_UTF_8_CODER); + private static final NullableCoder INTEGER_NULLABLE_CODER = + NullableCoder.of(BigEndianIntegerCoder.of()); + + public static DatadogWriteErrorCoder of() { + return DATADOG_WRITE_ERROR_CODER; + } + + @Override + public void encode(DatadogWriteError value, OutputStream out) throws CoderException, IOException { + INTEGER_NULLABLE_CODER.encode(value.statusCode(), out); + STRING_NULLABLE_CODER.encode(value.statusMessage(), out); + STRING_NULLABLE_CODER.encode(value.payload(), out); + } + + @Override + public DatadogWriteError decode(InputStream in) throws CoderException, IOException { + + DatadogWriteError.Builder builder = DatadogWriteError.newBuilder(); + + Integer statusCode = INTEGER_NULLABLE_CODER.decode(in); + if (statusCode != null) { + builder.withStatusCode(statusCode); + } + + String statusMessage = STRING_NULLABLE_CODER.decode(in); + if (statusMessage != null) { + builder.withStatusMessage(statusMessage); + } + + String payload = STRING_NULLABLE_CODER.decode(in); + if (payload != null) { + builder.withPayload(payload); + } + + return builder.build(); + } + + @Override + public TypeDescriptor getEncodedTypeDescriptor() { + return TYPE_DESCRIPTOR; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException( + this, "DatadogWriteError can hold arbitrary instances, which may be non-deterministic."); + } +} diff --git a/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/package-info.java b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/package-info.java new file mode 100644 index 000000000000..fbeed9f1a551 --- /dev/null +++ b/sdks/java/io/datadog/src/main/java/org/apache/beam/sdk/io/datadog/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Transforms for writing to Datadog. + * + *

The {@link org.apache.beam.sdk.io.datadog.DatadogIO} class provides a {@link + * org.apache.beam.sdk.transforms.PTransform} that allows writing data to the Datadog Logs API. + * + *

For more information on the Datadog Logs API, see the official documentation. + */ +package org.apache.beam.sdk.io.datadog; diff --git a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventCoderTest.java b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventCoderTest.java new file mode 100644 index 000000000000..f1dad0784af3 --- /dev/null +++ b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventCoderTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.junit.Test; + +/** Unit tests for {@link com.google.cloud.teleport.datadog.DatadogEventCoder} class. */ +public class DatadogEventCoderTest { + + /** + * Test whether {@link DatadogEventCoder} is able to encode/decode a {@link DatadogEvent} + * correctly. + * + * @throws IOException + */ + @Test + public void testEncodeDecode() throws IOException { + + String source = "test-source"; + String tags = "test-tags"; + String hostname = "test-hostname"; + String service = "test-service"; + String message = "test-message"; + + DatadogEvent actualEvent = + DatadogEvent.newBuilder() + .withSource(source) + .withTags(tags) + .withHostname(hostname) + .withService(service) + .withMessage(message) + .build(); + + DatadogEventCoder coder = DatadogEventCoder.of(); + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + coder.encode(actualEvent, bos); + try (ByteArrayInputStream bin = new ByteArrayInputStream(bos.toByteArray())) { + DatadogEvent decodedEvent = coder.decode(bin); + assertThat(decodedEvent, is(equalTo(actualEvent))); + } + } + } +} diff --git a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisherTest.java b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisherTest.java new file mode 100644 index 000000000000..17f6e7a6e152 --- /dev/null +++ b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventPublisherTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; + +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpContent; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.util.ExponentialBackOff; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.Test; +import org.mockserver.configuration.ConfigurationProperties; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.MediaType; +import org.mockserver.verify.VerificationTimes; + +/** Unit tests for {@link DatadogEventPublisher} class. */ +public class DatadogEventPublisherTest { + + private static final String EXPECTED_PATH = "/" + DatadogEventPublisher.DD_URL_PATH; + + private static final DatadogEvent DATADOG_TEST_EVENT_1 = + DatadogEvent.newBuilder() + .withSource("test-source-1") + .withTags("test-tags-1") + .withHostname("test-hostname-1") + .withService("test-service-1") + .withMessage("test-message-1") + .build(); + + private static final DatadogEvent DATADOG_TEST_EVENT_2 = + DatadogEvent.newBuilder() + .withSource("test-source-2") + .withTags("test-tags-2") + .withHostname("test-hostname-2") + .withService("test-service-2") + .withMessage("test-message-2") + .build(); + + private static final List DATADOG_EVENTS = + ImmutableList.of(DATADOG_TEST_EVENT_1, DATADOG_TEST_EVENT_2); + + /** Test whether {@link HttpContent} is created from the list of {@link DatadogEvent}s. */ + @Test + public void contentTest() throws NoSuchAlgorithmException, KeyManagementException, IOException { + + DatadogEventPublisher publisher = + DatadogEventPublisher.newBuilder() + .withUrl("http://example.com") + .withApiKey("test-api-key") + .build(); + + String expectedString = + "[" + + "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\"," + + "\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\"," + + "\"message\":\"test-message-1\"}," + + "{\"ddsource\":\"test-source-2\",\"ddtags\":\"test-tags-2\"," + + "\"hostname\":\"test-hostname-2\",\"service\":\"test-service-2\"," + + "\"message\":\"test-message-2\"}" + + "]"; + + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + HttpContent actualContent = publisher.getContent(DATADOG_EVENTS); + actualContent.writeTo(bos); + String actualString = new String(bos.toByteArray(), StandardCharsets.UTF_8); + assertThat(actualString, is(equalTo(expectedString))); + } + } + + @Test + public void genericURLTest() throws IOException { + + String baseURL = "http://example.com"; + DatadogEventPublisher.Builder builder = + DatadogEventPublisher.newBuilder().withUrl(baseURL).withApiKey("test-api-key"); + + assertThat( + builder.genericUrl(), + is(equalTo(new GenericUrl(Joiner.on('/').join(baseURL, "api/v2/logs"))))); + } + + @Test + public void configureBackOffDefaultTest() + throws NoSuchAlgorithmException, KeyManagementException, IOException { + + DatadogEventPublisher publisherDefaultBackOff = + DatadogEventPublisher.newBuilder() + .withUrl("http://example.com") + .withApiKey("test-api-key") + .build(); + + assertThat( + publisherDefaultBackOff.getConfiguredBackOff().getMaxElapsedTimeMillis(), + is(equalTo(ExponentialBackOff.DEFAULT_MAX_ELAPSED_TIME_MILLIS))); + } + + @Test + public void configureBackOffCustomTest() + throws NoSuchAlgorithmException, KeyManagementException, IOException { + + int timeoutInMillis = 600000; // 10 minutes + DatadogEventPublisher publisherWithBackOff = + DatadogEventPublisher.newBuilder() + .withUrl("http://example.com") + .withApiKey("test-api-key") + .withMaxElapsedMillis(timeoutInMillis) + .build(); + + assertThat( + publisherWithBackOff.getConfiguredBackOff().getMaxElapsedTimeMillis(), + is(equalTo(timeoutInMillis))); + } + + @Test + public void requestHeadersTest() throws Exception { + ConfigurationProperties.disableSystemOut(true); + try (ClientAndServer mockServer = startClientAndServer()) { + mockServer + .when(org.mockserver.model.HttpRequest.request(EXPECTED_PATH)) + .respond(org.mockserver.model.HttpResponse.response().withStatusCode(202)); + + DatadogEventPublisher publisher = + DatadogEventPublisher.newBuilder() + .withUrl(Joiner.on(':').join("http://localhost", mockServer.getPort())) + .withApiKey("test-api-key") + .build(); + + DatadogEvent event = + DatadogEvent.newBuilder() + .withSource("test-source-1") + .withTags("test-tags-1") + .withHostname("test-hostname-1") + .withService("test-service-1") + .withMessage("test-message-1") + .build(); + + HttpResponse response = publisher.execute(ImmutableList.of(event)); + assertThat(response.getStatusCode(), is(equalTo(202))); + + mockServer.verify( + org.mockserver.model.HttpRequest.request(EXPECTED_PATH) + .withContentType(MediaType.APPLICATION_JSON) + .withHeader("dd-api-key", "test-api-key") + .withHeader("dd-evp-origin", "dataflow") + .withHeader("Accept-Encoding", "gzip"), + VerificationTimes.once()); + } + } +} diff --git a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializerTest.java b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializerTest.java new file mode 100644 index 000000000000..15b127da2f01 --- /dev/null +++ b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventSerializerTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.List; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.Test; + +public class DatadogEventSerializerTest { + + private static final DatadogEvent DATADOG_TEST_EVENT_1 = + DatadogEvent.newBuilder() + .withSource("test-source-1") + .withTags("test-tags-1") + .withHostname("test-hostname-1") + .withService("test-service-1") + .withMessage("test-message-1") + .build(); + + private static final DatadogEvent DATADOG_TEST_EVENT_2 = + DatadogEvent.newBuilder() + .withSource("test-source-2") + .withTags("test-tags-2") + .withHostname("test-hostname-2") + .withService("test-service-2") + .withMessage("test-message-2") + .build(); + + private static final List DATADOG_EVENTS = + ImmutableList.of(DATADOG_TEST_EVENT_1, DATADOG_TEST_EVENT_2); + + /** Test whether payload is stringified as expected. */ + @Test + public void stringPayloadTest_list() { + String actual = DatadogEventSerializer.getPayloadString(DATADOG_EVENTS); + + String expected = + "[" + + "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\"," + + "\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\"," + + "\"message\":\"test-message-1\"}," + + "{\"ddsource\":\"test-source-2\",\"ddtags\":\"test-tags-2\"," + + "\"hostname\":\"test-hostname-2\",\"service\":\"test-service-2\"," + + "\"message\":\"test-message-2\"}" + + "]"; + + assertThat(expected, is(equalTo(actual))); + } + + /** Test whether payload is stringified as expected. */ + @Test + public void stringPayloadTest_single() { + String actual = DatadogEventSerializer.getPayloadString(DATADOG_TEST_EVENT_1); + + String expected = + "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\"," + + "\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\"," + + "\"message\":\"test-message-1\"}"; + + assertThat(expected, is(equalTo(actual))); + } + + /** Test payload size calculation for a payload string. */ + @Test + public void stringPayloadSizeTest() { + long actual = + DatadogEventSerializer.getPayloadSize( + "{\"ddsource\":\"test-source-1\",\"ddtags\":\"test-tags-1\"," + + "\"hostname\":\"test-hostname-1\",\"service\":\"test-service-1\"," + + "\"message\":\"test-message-1\"}"); + + long expected = 134L; + + assertThat(expected, is(equalTo(actual))); + } +} diff --git a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventTest.java b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventTest.java new file mode 100644 index 000000000000..de1759faafbe --- /dev/null +++ b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.junit.Test; + +/** Unit tests for {@link DatadogEvent} class. */ +public class DatadogEventTest { + + /** Test whether a {@link DatadogEvent} created via its builder can be compared correctly. */ + @Test + public void testEquals() { + String source = "test-source"; + String tags = "test-tags"; + String hostname = "test-hostname"; + String service = "test-service"; + String message = "test-message"; + + DatadogEvent actualEvent = + DatadogEvent.newBuilder() + .withSource(source) + .withTags(tags) + .withHostname(hostname) + .withService(service) + .withMessage(message) + .build(); + + assertThat( + actualEvent, + is( + equalTo( + DatadogEvent.newBuilder() + .withSource(source) + .withTags(tags) + .withHostname(hostname) + .withService(service) + .withMessage(message) + .build()))); + + assertThat( + actualEvent, + is( + not( + equalTo( + DatadogEvent.newBuilder() + .withSource(source) + .withTags(tags) + .withHostname(hostname) + .withService(service) + .withMessage("a-different-test-message") + .build())))); + } +} diff --git a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventWriterTest.java b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventWriterTest.java new file mode 100644 index 000000000000..086bb93f53e1 --- /dev/null +++ b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogEventWriterTest.java @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockserver.configuration.ConfigurationProperties; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.matchers.Times; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.HttpResponse; +import org.mockserver.verify.VerificationTimes; + +/** Unit tests for {@link com.google.cloud.teleport.datadog.DatadogEventWriter} class. */ +public class DatadogEventWriterTest { + + private static final String EXPECTED_PATH = "/" + DatadogEventPublisher.DD_URL_PATH; + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + // We create a MockServerRule to simulate an actual Datadog API server. + private ClientAndServer mockServer; + + @Before + public void setup() { + ConfigurationProperties.disableSystemOut(true); + mockServer = startClientAndServer(); + } + + @After + public void tearDown() { + if (mockServer != null) { + mockServer.stop(); + } + } + + /** Test building {@link DatadogEventWriter} with missing URL. */ + @Test + public void eventWriterMissingURL() { + + Exception thrown = + assertThrows(NullPointerException.class, () -> DatadogEventWriter.newBuilder().build()); + + assertThat(thrown).hasMessageThat().contains("url needs to be provided"); + } + + /** Test building {@link DatadogEventWriter} with missing URL protocol. */ + @Test + public void eventWriterMissingURLProtocol() { + + Exception thrown = + assertThrows( + IllegalArgumentException.class, + () -> DatadogEventWriter.newBuilder().withUrl("test-url").build()); + + assertThat(thrown).hasMessageThat().contains(DatadogEventWriter.INVALID_URL_FORMAT_MESSAGE); + } + + /** Test building {@link DatadogEventWriter} with an invalid URL. */ + @Test + public void eventWriterInvalidURL() { + + Exception thrown = + assertThrows( + IllegalArgumentException.class, + () -> DatadogEventWriter.newBuilder().withUrl("http://1.2.3").build()); + + assertThat(thrown).hasMessageThat().contains(DatadogEventWriter.INVALID_URL_FORMAT_MESSAGE); + } + + /** Test building {@link DatadogEventWriter} with the 'api/v2/logs' path appended to the URL. */ + @Test + public void eventWriterFullEndpoint() { + + Exception thrown = + assertThrows( + IllegalArgumentException.class, + () -> + DatadogEventWriter.newBuilder() + .withUrl("http://test-url:8088/api/v2/logs") + .build()); + + assertThat(thrown).hasMessageThat().contains(DatadogEventWriter.INVALID_URL_FORMAT_MESSAGE); + } + + /** Test building {@link DatadogEventWriter} with missing token. */ + @Test + public void eventWriterMissingToken() { + + Exception thrown = + assertThrows( + NullPointerException.class, + () -> DatadogEventWriter.newBuilder().withUrl("http://test-url").build()); + + assertThat(thrown).hasMessageThat().contains("apiKey needs to be provided"); + } + + /** Test building {@link DatadogEventWriter} with default batch count. */ + @Test + public void eventWriterDefaultBatchCount() { + + DatadogEventWriter writer = + DatadogEventWriter.newBuilder() + .withUrl("http://test-url") + .withApiKey("test-api-key") + .build(); + + assertThat(writer.inputBatchCount()).isNull(); + } + + /** + * Test building {@link DatadogEventWriter} with a batchCount less than the configured minimum. + */ + @Test + public void eventWriterBatchCountTooSmall() { + + Exception thrown = + assertThrows( + IllegalArgumentException.class, + () -> + DatadogEventWriter.newBuilder(7) + .withUrl("http://test-url") + .withApiKey("test-api-key") + .withInputBatchCount(6) + .build()); + + assertThat(thrown) + .hasMessageThat() + .contains("inputBatchCount must be greater than or equal to 7"); + } + + /** Test building {@link DatadogEventWriter} with a batchCount greater than 1000. */ + @Test + public void eventWriterBatchCountTooBig() { + + Exception thrown = + assertThrows( + IllegalArgumentException.class, + () -> + DatadogEventWriter.newBuilder() + .withUrl("http://test-url") + .withApiKey("test-api-key") + .withInputBatchCount(1001) + .build()); + + assertThat(thrown) + .hasMessageThat() + .contains("inputBatchCount must be less than or equal to 1000"); + } + + /** Test building {@link DatadogEventWriter} with custom batchCount . */ + @Test + public void eventWriterCustomBatchCountAndValidation() { + + Integer batchCount = 30; + DatadogEventWriter writer = + DatadogEventWriter.newBuilder() + .withUrl("http://test-url") + .withApiKey("test-api-key") + .withInputBatchCount(batchCount) + .build(); + + assertThat(writer.inputBatchCount()).isEqualTo(batchCount); + } + + /** Test building {@link DatadogEventWriter} with default maxBufferSize . */ + @Test + public void eventWriterDefaultMaxBufferSize() { + + DatadogEventWriter writer = + DatadogEventWriter.newBuilder() + .withUrl("http://test-url") + .withApiKey("test-api-key") + .build(); + + assertThat(writer.maxBufferSize()).isNull(); + } + + /** Test building {@link DatadogEventWriter} with custom maxBufferSize . */ + @Test + public void eventWriterCustomMaxBufferSizeAndValidation() { + + Long maxBufferSize = 1_427_841L; + DatadogEventWriter writer = + DatadogEventWriter.newBuilder() + .withUrl("http://test-url") + .withMaxBufferSize(maxBufferSize) + .withApiKey("test-api-key") + .build(); + + assertThat(writer.maxBufferSize()).isEqualTo(maxBufferSize); + } + + /** Test successful POST request for single batch. */ + @Test + @Category(NeedsRunner.class) + public void successfulDatadogWriteSingleBatchTest() { + + // Create server expectation for success. + addRequestExpectation(202); + + int testPort = mockServer.getPort(); + + List> testEvents = + ImmutableList.of( + KV.of( + 123, + DatadogEvent.newBuilder() + .withSource("test-source-1") + .withTags("test-tags-1") + .withHostname("test-hostname-1") + .withService("test-service-1") + .withMessage("test-message-1") + .build()), + KV.of( + 123, + DatadogEvent.newBuilder() + .withSource("test-source-2") + .withTags("test-tags-2") + .withHostname("test-hostname-2") + .withService("test-service-2") + .withMessage("test-message-2") + .build())); + + PCollection actual = + pipeline + .apply( + "Create Input data", + Create.of(testEvents) + .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), DatadogEventCoder.of()))) + .apply( + "DatadogEventWriter", + ParDo.of( + DatadogEventWriter.newBuilder(1) + .withUrl(Joiner.on(':').join("http://localhost", testPort)) + .withInputBatchCount(1) // Test one request per DatadogEvent + .withApiKey("test-api-key") + .build())) + .setCoder(DatadogWriteErrorCoder.of()); + + // All successful responses. + PAssert.that(actual).empty(); + + pipeline.run(); + + // Server received exactly the expected number of POST requests. + mockServer.verify( + HttpRequest.request(EXPECTED_PATH), VerificationTimes.exactly(testEvents.size())); + } + + /** Test successful POST request for multi batch. */ + @Test + @Category(NeedsRunner.class) + public void successfulDatadogWriteMultiBatchTest() { + + // Create server expectation for success. + addRequestExpectation(202); + + int testPort = mockServer.getPort(); + + List> testEvents = + ImmutableList.of( + KV.of( + 123, + DatadogEvent.newBuilder() + .withSource("test-source-1") + .withTags("test-tags-1") + .withHostname("test-hostname-1") + .withService("test-service-1") + .withMessage("test-message-1") + .build()), + KV.of( + 123, + DatadogEvent.newBuilder() + .withSource("test-source-2") + .withTags("test-tags-2") + .withHostname("test-hostname-2") + .withService("test-service-2") + .withMessage("test-message-2") + .build())); + + PCollection actual = + pipeline + .apply( + "Create Input data", + Create.of(testEvents) + .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), DatadogEventCoder.of()))) + .apply( + "DatadogEventWriter", + ParDo.of( + DatadogEventWriter.newBuilder(1) + .withUrl(Joiner.on(':').join("http://localhost", testPort)) + .withInputBatchCount(testEvents.size()) // all requests in a single batch. + .withApiKey("test-api-key") + .build())) + .setCoder(DatadogWriteErrorCoder.of()); + + // All successful responses. + PAssert.that(actual).empty(); + + pipeline.run(); + + // Server received exactly one POST request. + mockServer.verify(HttpRequest.request(EXPECTED_PATH), VerificationTimes.once()); + } + + /** Test successful POST requests for batch exceeding max buffer size. */ + @Test + @Category(NeedsRunner.class) + public void successfulDatadogWriteExceedingMaxBufferSize() { + + // Create server expectation for success. + addRequestExpectation(202); + + int testPort = mockServer.getPort(); + + String payloadFormat = "{\"message\":\"%s\"}"; + long jsonSize = DatadogEventSerializer.getPayloadSize(String.format(payloadFormat, "")); + + long maxBufferSize = 100; + long msgSize = 50; + + char[] bunchOfAs = new char[(int) (msgSize - jsonSize)]; + Arrays.fill(bunchOfAs, 'a'); + + List> testEvents = new ArrayList<>(); + for (int i = 1; i <= 3; i++) { + testEvents.add( + KV.of(123, DatadogEvent.newBuilder().withMessage(new String(bunchOfAs)).build())); + } + + PCollection actual = + pipeline + .apply( + "Create Input data", + Create.of(testEvents) + .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), DatadogEventCoder.of()))) + .apply( + "DatadogEventWriter", + ParDo.of( + DatadogEventWriter.newBuilder(1) + .withUrl(Joiner.on(':').join("http://localhost", testPort)) + .withInputBatchCount(testEvents.size()) + .withMaxBufferSize(maxBufferSize) + .withApiKey("test-api-key") + .build())) + .setCoder(DatadogWriteErrorCoder.of()); + + // All successful responses. + PAssert.that(actual).empty(); + + pipeline.run(); + + // Server received exactly two POST requests: + // 1st batch of size=2 due to next msg exceeding max buffer size + // 2nd batch of size=1 due to timer + mockServer.verify(HttpRequest.request(EXPECTED_PATH), VerificationTimes.exactly(2)); + } + + /** Test failed POST request. */ + @Test + @Category(NeedsRunner.class) + public void failedDatadogWriteSingleBatchTest() { + + // Create server expectation for FAILURE. + addRequestExpectation(404); + + int testPort = mockServer.getPort(); + + List> testEvents = + ImmutableList.of( + KV.of( + 123, + DatadogEvent.newBuilder() + .withSource("test-source-1") + .withTags("test-tags-1") + .withHostname("test-hostname-1") + .withService("test-service-1") + .withMessage("test-message-1") + .build())); + + PCollection actual = + pipeline + .apply( + "Create Input data", + Create.of(testEvents) + .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), DatadogEventCoder.of()))) + .apply( + "DatadogEventWriter", + ParDo.of( + DatadogEventWriter.newBuilder(1) + .withUrl(Joiner.on(':').join("http://localhost", testPort)) + .withInputBatchCount(testEvents.size()) // all requests in a single batch. + .withApiKey("test-api-key") + .build())) + .setCoder(DatadogWriteErrorCoder.of()); + + // Expect a single 404 Not found DatadogWriteError + PAssert.that(actual) + .containsInAnyOrder( + DatadogWriteError.newBuilder() + .withStatusCode(404) + .withStatusMessage("Not Found") + .withPayload( + "{\"ddsource\":\"test-source-1\"," + + "\"ddtags\":\"test-tags-1\",\"hostname\":\"test-hostname-1\"," + + "\"service\":\"test-service-1\",\"message\":\"test-message-1\"}") + .build()); + + pipeline.run(); + + // Server received exactly one POST request. + mockServer.verify(HttpRequest.request(EXPECTED_PATH), VerificationTimes.once()); + } + + /** Test failed due to single event exceeding max buffer size. */ + @Test + @Category(NeedsRunner.class) + public void failedDatadogEventTooBig() { + + // Create server expectation for FAILURE. + addRequestExpectation(404); + + int testPort = mockServer.getPort(); + + String payloadFormat = "{\"message\":\"%s\"}"; + + long maxBufferSize = 100; + char[] bunchOfAs = + new char + [(int) + (maxBufferSize + + 1L + - DatadogEventSerializer.getPayloadSize(String.format(payloadFormat, "")))]; + Arrays.fill(bunchOfAs, 'a'); + String messageTooBig = new String(bunchOfAs); + + String expectedPayload = String.format(payloadFormat, messageTooBig); + long expectedPayloadSize = DatadogEventSerializer.getPayloadSize(expectedPayload); + assertThat(maxBufferSize + 1L).isEqualTo(expectedPayloadSize); + + List> testEvents = + ImmutableList.of(KV.of(123, DatadogEvent.newBuilder().withMessage(messageTooBig).build())); + + PCollection actual = + pipeline + .apply( + "Create Input data", + Create.of(testEvents) + .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), DatadogEventCoder.of()))) + .apply( + "DatadogEventWriter", + ParDo.of( + DatadogEventWriter.newBuilder() + .withUrl(Joiner.on(':').join("http://localhost", testPort)) + .withMaxBufferSize(maxBufferSize) + .withApiKey("test-api-key") + .build())) + .setCoder(DatadogWriteErrorCoder.of()); + + // Expect a single DatadogWriteError due to exceeding max buffer size + PAssert.that(actual) + .containsInAnyOrder(DatadogWriteError.newBuilder().withPayload(expectedPayload).build()); + + pipeline.run(); + + // Server did not receive any requests. + mockServer.verify(HttpRequest.request(EXPECTED_PATH), VerificationTimes.exactly(0)); + } + + /** Test retryable POST request. */ + @Test + @Category(NeedsRunner.class) + public void retryableDatadogWriteSingleBatchTest() { + + // Create server expectations for 3 retryable failures, 1 success. + addRequestExpectation(408, Times.once()); + addRequestExpectation(429, Times.once()); + addRequestExpectation(502, Times.once()); + addRequestExpectation(202, Times.once()); + + int testPort = mockServer.getPort(); + + List> testEvents = + ImmutableList.of( + KV.of( + 123, + DatadogEvent.newBuilder() + .withSource("test-source-1") + .withTags("test-tags-1") + .withHostname("test-hostname-1") + .withService("test-service-1") + .withMessage("test-message-1") + .build())); + + PCollection actual = + pipeline + .apply( + "Create Input data", + Create.of(testEvents) + .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), DatadogEventCoder.of()))) + .apply( + "DatadogEventWriter", + ParDo.of( + DatadogEventWriter.newBuilder(1) + .withUrl(Joiner.on(':').join("http://localhost", testPort)) + .withInputBatchCount(testEvents.size()) // all requests in a single batch. + .withApiKey("test-api-key") + .build())) + .setCoder(DatadogWriteErrorCoder.of()); + + PAssert.that(actual).empty(); + + // All successful responses, eventually. + pipeline.run(); + + // Server received exactly 4 POST requests (3 retryable failures, 1 success). + mockServer.verify(HttpRequest.request(EXPECTED_PATH), VerificationTimes.exactly(4)); + } + + private void addRequestExpectation(int statusCode) { + addRequestExpectation(statusCode, Times.unlimited()); + } + + private void addRequestExpectation(int statusCode, Times times) { + mockServer + .when(HttpRequest.request(EXPECTED_PATH), times) + .respond(HttpResponse.response().withStatusCode(statusCode)); + } +} diff --git a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogIOTest.java b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogIOTest.java new file mode 100644 index 000000000000..8680333b4dda --- /dev/null +++ b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogIOTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.mockserver.integration.ClientAndServer.startClientAndServer; + +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockserver.configuration.ConfigurationProperties; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.HttpResponse; +import org.mockserver.verify.VerificationTimes; + +/** Unit tests for {@link com.google.cloud.teleport.datadog.DatadogIO} class. */ +public class DatadogIOTest { + + private static final DatadogEvent DATADOG_TEST_EVENT_1 = + DatadogEvent.newBuilder() + .withSource("test-source-1") + .withTags("test-tags-1") + .withHostname("test-hostname-1") + .withService("test-service-1") + .withMessage("test-message-1") + .build(); + + private static final DatadogEvent DATADOG_TEST_EVENT_2 = + DatadogEvent.newBuilder() + .withSource("test-source-2") + .withTags("test-tags-2") + .withHostname("test-hostname-2") + .withService("test-service-2") + .withMessage("test-message-2") + .build(); + + private static final List DATADOG_EVENTS = + ImmutableList.of(DATADOG_TEST_EVENT_1, DATADOG_TEST_EVENT_2); + + private static final String EXPECTED_PATH = "/" + DatadogEventPublisher.DD_URL_PATH; + private static final int TEST_PARALLELISM = 2; + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + // We create a mock server to simulate an actual Datadog API server. + private ClientAndServer mockServer; + + @Before + public void setup() throws IOException { + ConfigurationProperties.disableSystemOut(true); + mockServer = startClientAndServer(); + } + + /** Test successful multi-event POST request for DatadogIO without parallelism. */ + @Test + @Category(NeedsRunner.class) + public void successfulDatadogIOMultiBatchNoParallelismTest() { + + // Create server expectation for success. + mockServerListening(200); + PCollection actual = + pipeline + .apply("Create Input data", Create.of(DATADOG_EVENTS).withCoder(DatadogEventCoder.of())) + .apply( + "DatadogIO", + DatadogIO.writeBuilder(1) + .withParallelism(1) + .withBatchCount(DATADOG_EVENTS.size()) + .withApiKey("test-api-key") + .withUrl(Joiner.on(':').join("http://localhost", mockServer.getPort())) + .build()) + .setCoder(DatadogWriteErrorCoder.of()); + + // All successful responses. + PAssert.that(actual).empty(); + + pipeline.run(); + + // Server received exactly one POST request. + mockServer.verify(HttpRequest.request(EXPECTED_PATH), VerificationTimes.once()); + } + + /** Test successful multi-event POST request for DatadogIO with parallelism. */ + @Test + @Category(NeedsRunner.class) + public void successfulDatadogIOMultiBatchParallelismTest() { + + // Create server expectation for success. + mockServerListening(200); + PCollection actual = + pipeline + .apply("Create Input data", Create.of(DATADOG_EVENTS).withCoder(DatadogEventCoder.of())) + .apply( + "DatadogIO", + DatadogIO.writeBuilder(1) + .withParallelism(TEST_PARALLELISM) + .withBatchCount(DATADOG_EVENTS.size()) + .withApiKey("test-api-key") + .withUrl(Joiner.on(':').join("http://localhost", mockServer.getPort())) + .build()) + .setCoder(DatadogWriteErrorCoder.of()); + + // All successful responses. + PAssert.that(actual).empty(); + + pipeline.run(); + + // Server received exactly one POST request per parallelism + mockServer.verify(HttpRequest.request(EXPECTED_PATH), VerificationTimes.atLeast(1)); + } + + /** Test successful multi-event POST request for DatadogIO with parallelism. */ + @Test + @Category(NeedsRunner.class) + public void successfulDatadogIOSingleBatchParallelismTest() { + + // Create server expectation for success. + mockServerListening(200); + PCollection actual = + pipeline + .apply("Create Input data", Create.of(DATADOG_EVENTS).withCoder(DatadogEventCoder.of())) + .apply( + "DatadogIO", + DatadogIO.writeBuilder(1) + .withParallelism(TEST_PARALLELISM) + .withBatchCount(1) + .withApiKey("test-api-key") + .withUrl(Joiner.on(':').join("http://localhost", mockServer.getPort())) + .build()) + .setCoder(DatadogWriteErrorCoder.of()); + + // All successful responses. + PAssert.that(actual).empty(); + + pipeline.run(); + + // Server received exactly 1 post request per DatadogEvent + mockServer.verify( + HttpRequest.request(EXPECTED_PATH), VerificationTimes.exactly(DATADOG_EVENTS.size())); + } + + private void mockServerListening(int statusCode) { + mockServer + .when(HttpRequest.request(EXPECTED_PATH)) + .respond(HttpResponse.response().withStatusCode(statusCode)); + } +} diff --git a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoderTest.java b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoderTest.java new file mode 100644 index 000000000000..e5932d2b6120 --- /dev/null +++ b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorCoderTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.junit.Test; + +/** Unit tests for {@link com.google.cloud.teleport.datadog.DatadogWriteErrorCoder} class. */ +public class DatadogWriteErrorCoderTest { + + /** + * Test whether {@link DatadogWriteErrorCoder} is able to encode/decode a {@link + * DatadogWriteError} correctly. + * + * @throws IOException + */ + @Test + public void testEncodeDecode() throws IOException { + + String payload = "test-payload"; + String message = "test-message"; + Integer statusCode = 123; + + DatadogWriteError actualError = + DatadogWriteError.newBuilder() + .withPayload(payload) + .withStatusCode(statusCode) + .withStatusMessage(message) + .build(); + + DatadogWriteErrorCoder coder = DatadogWriteErrorCoder.of(); + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + coder.encode(actualError, bos); + try (ByteArrayInputStream bin = new ByteArrayInputStream(bos.toByteArray())) { + DatadogWriteError decodedWriteError = coder.decode(bin); + assertThat(decodedWriteError, is(equalTo(actualError))); + } + } + } +} diff --git a/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorTest.java b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorTest.java new file mode 100644 index 000000000000..0aadc1f7018d --- /dev/null +++ b/sdks/java/io/datadog/src/test/java/org/apache/beam/sdk/io/datadog/DatadogWriteErrorTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.datadog; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.junit.Test; + +/** Unit tests for {@link DatadogWriteError} class. */ +public class DatadogWriteErrorTest { + + /** Test whether a {@link DatadogWriteError} created via its builder can be compared correctly. */ + @Test + public void testEquals() { + + String payload = "test-payload"; + String message = "test-message"; + Integer statusCode = 123; + + DatadogWriteError actualError = + DatadogWriteError.newBuilder() + .withPayload(payload) + .withStatusCode(statusCode) + .withStatusMessage(message) + .build(); + + assertThat( + actualError, + is( + equalTo( + DatadogWriteError.newBuilder() + .withPayload(payload) + .withStatusCode(statusCode) + .withStatusMessage(message) + .build()))); + + assertThat( + actualError, + is( + not( + equalTo( + DatadogWriteError.newBuilder() + .withPayload(payload) + .withStatusCode(statusCode) + .withStatusMessage("a-different-message") + .build())))); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index d99000383ea7..4540fa4b597b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -225,6 +225,7 @@ include(":sdks:java:io:file-based-io-tests") include(":sdks:java:io:bigquery-io-perf-tests") include(":sdks:java:io:cdap") include(":sdks:java:io:csv") +include(":sdks:java:io:datadog") include(":sdks:java:io:file-schema-transform") include(":sdks:java:io:google-ads") include(":sdks:java:io:google-cloud-platform")