Skip to content

Commit

Permalink
Adds max_request_length as a configuration for the http, otel_trace_s…
Browse files Browse the repository at this point in the history
…ource, otel_metrics_source, and otel_logs_source sources. Resolves opensearch-project#3931

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable committed Jan 12, 2024
1 parent 787064e commit b5f24c3
Show file tree
Hide file tree
Showing 17 changed files with 192 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ public static ByteCount parse(final String string) {
return new ByteCount(byteCount.longValue());
}

/**
* Returns a {@link ByteCount} with the total number of bytes provided.
*
* @param bytes The number of bytes
* @return A new {@link ByteCount}
* @since 2.7
*/
public static ByteCount ofBytes(final long bytes) {
if(bytes < 0)
throw new IllegalArgumentException("The argument provided for bytes is negative.");

return new ByteCount(bytes);
}

public static ByteCount zeroBytes() {
return ZERO_BYTES;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,21 @@ void parse_returns_rounded_bytes_for_implicit_fractional_bytes(final String byte
assertThat(byteCount.getBytes(), equalTo(expectedBytes));
}

@ParameterizedTest
@ValueSource(longs = {0, 1, 2, 1024, Integer.MAX_VALUE, (long) Integer.MAX_VALUE + 100})
void ofBytes_returns_with_same_bytes(final long bytes) {
final ByteCount byteCount = ByteCount.ofBytes(bytes);

assertThat(byteCount, notNullValue());
assertThat(byteCount.getBytes(), equalTo(bytes));
}

@ParameterizedTest
@ValueSource(longs = {-1, -2, -1024, Integer.MIN_VALUE, (long) Integer.MIN_VALUE - 100})
void ofBytes_throws_with_invalid_bytes(final long bytes) {
assertThrows(IllegalArgumentException.class, () -> ByteCount.ofBytes(bytes));
}

@Test
void zeroBytes_returns_bytes_with_getBytes_equal_to_0() {
assertThat(ByteCount.zeroBytes(), notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.parser.ByteCountDeserializer;
import org.opensearch.dataprepper.parser.DataPrepperDurationDeserializer;
import org.springframework.context.annotation.Bean;

Expand Down Expand Up @@ -33,6 +35,7 @@ ObjectMapper extensionPluginConfigObjectMapper() {
ObjectMapper pluginConfigObjectMapper(final VariableExpander variableExpander) {
final SimpleModule simpleModule = new SimpleModule();
simpleModule.addDeserializer(Duration.class, new DataPrepperDurationDeserializer());
simpleModule.addDeserializer(ByteCount.class, new ByteCountDeserializer());
TRANSLATE_VALUE_SUPPORTED_JAVA_TYPES.stream().forEach(clazz -> simpleModule.addDeserializer(
clazz, new DataPrepperScalarTypeDeserializer<>(variableExpander, clazz)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.server.HttpStatusException;
import com.linecorp.armeria.server.RequestTimeoutException;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
Expand Down Expand Up @@ -56,6 +57,9 @@ public HttpResponse handleException(final ServiceRequestContext ctx, final HttpR
}

private HttpStatus handleException(final Throwable e) {
if(e instanceof HttpStatusException) {
return ((HttpStatusException) e).httpStatus();
}
if (e instanceof IOException) {
badRequestsCounter.increment();
return HttpStatus.BAD_REQUEST;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
package org.opensearch.dataprepper;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.server.HttpStatusException;
import com.linecorp.armeria.server.RequestTimeoutException;
import com.linecorp.armeria.server.ServiceRequestContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import com.linecorp.armeria.common.AggregatedHttpResponse;
Expand All @@ -24,7 +28,11 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -163,6 +171,18 @@ public void testHandleSizeOverflowException() throws ExecutionException, Interru
verify(requestsTooLargeCounter, times(2)).increment();
}

@ParameterizedTest
@ValueSource(ints = {413, 429})
void handleException_with_HttpStatusException(final int statusCode) throws ExecutionException, InterruptedException {
final HttpStatus httpStatus = HttpStatus.valueOf(statusCode);
final HttpStatusException httpStatusException = mock(HttpStatusException.class);
when(httpStatusException.httpStatus()).thenReturn(httpStatus);
final HttpResponse httpResponse = httpRequestExceptionHandler.handleException(serviceRequestContext, httpRequest, httpStatusException);

assertThat(httpResponse, notNullValue());
assertThat(httpResponse.aggregate().get().status(), equalTo(httpStatus));
}

@Test
public void testHandleUnknownException() throws ExecutionException, InterruptedException {
// Prepare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public void start(final Buffer<Record<Log>> buffer) {

sb.maxNumConnections(sourceConfig.getMaxConnectionCount());
sb.requestTimeout(Duration.ofMillis(sourceConfig.getRequestTimeoutInMillis()));
if(sourceConfig.getMaxRequestLength() != null) {
sb.maxRequestLength(sourceConfig.getMaxRequestLength().getBytes());
}
final int threads = sourceConfig.getThreadCount();
final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor(threads);
sb.blockingTaskExecutor(blockingTaskExecutor, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.loghttp;

import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -92,6 +93,9 @@ public class HTTPSourceConfig {
@JsonProperty(COMPRESSION)
private CompressionOption compression = CompressionOption.NONE;

@JsonProperty("max_request_length")
private ByteCount maxRequestLength;

private PluginModel authentication;

public boolean isSslCertAndKeyFileInS3() {
Expand Down Expand Up @@ -217,4 +221,8 @@ public boolean isUnauthenticatedHealthCheck() {
public CompressionOption getCompression() {
return compression;
}

public ByteCount getMaxRequestLength() {
return maxRequestLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.log.Log;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import com.linecorp.armeria.client.ClientFactory;
import com.linecorp.armeria.client.ResponseTimeoutException;
Expand Down Expand Up @@ -72,6 +73,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -742,4 +744,32 @@ public void testRunAnotherSourceWithSamePort() {
//Expect RuntimeException because when port is already in use, BindException is thrown which is not RuntimeException
Assertions.assertThrows(RuntimeException.class, () -> secondSource.start(testBuffer));
}

@Test
public void request_that_exceeds_maxRequestLength_returns_413() {
lenient().when(sourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4));
HTTPSourceUnderTest = new HTTPSource(sourceConfig, pluginMetrics, pluginFactory, pipelineDescription);
// Prepare
final String testData = "[{\"log\": \"somelog\"}]";

assertThat((long) testData.getBytes().length, greaterThan(sourceConfig.getMaxRequestLength().getBytes()));
HTTPSourceUnderTest.start(testBuffer);
refreshMeasurements();

// When
WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:2021")
.method(HttpMethod.POST)
.path("/log/ingest")
.contentType(MediaType.JSON_UTF_8)
.build(),
HttpData.ofUtf8(testData))
.aggregate()
.whenComplete((i, ex) -> assertSecureResponseWithStatusCode(i, HttpStatus.REQUEST_ENTITY_TOO_LARGE)).join();

// Then
Assertions.assertTrue(testBuffer.isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ public void start(Buffer<Record<Object>> buffer) {
sb.service(grpcServiceBuilder.build(), DecodingService.newDecorator());
}
sb.requestTimeoutMillis(oTelLogsSourceConfig.getRequestTimeoutInMillis());
if(oTelLogsSourceConfig.getMaxRequestLength() != null) {
sb.maxRequestLength(oTelLogsSourceConfig.getMaxRequestLength().getBytes());
}

// ACM Cert for SSL takes preference
if (oTelLogsSourceConfig.isSsl() || oTelLogsSourceConfig.useAcmCertForSSL()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Size;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.model.configuration.PluginModel;

Expand Down Expand Up @@ -100,6 +101,9 @@ public class OTelLogsSourceConfig {
@JsonProperty(COMPRESSION)
private CompressionOption compression = CompressionOption.NONE;

@JsonProperty("max_request_length")
private ByteCount maxRequestLength;

@AssertTrue(message = "path should start with /")
boolean isPathValid() {
return path == null || path.startsWith("/");
Expand Down Expand Up @@ -209,5 +213,9 @@ public int getMaxConnectionCount() {
public CompressionOption getCompression() {
return compression;
}

public ByteCount getMaxRequestLength() {
return maxRequestLength;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.opensearch.dataprepper.plugins.certificate.CertificateProvider;
Expand Down Expand Up @@ -761,6 +762,25 @@ void gRPC_request_returns_expected_status_for_exceptions_from_buffer(
assertThat(actualException.getStatus().getCode(), equalTo(expectedStatusCode));
}

@Test
void request_that_exceeds_maxRequestLength_returns_413() throws InvalidProtocolBufferException {
when(oTelLogsSourceConfig.enableUnframedRequests()).thenReturn(true);
when(oTelLogsSourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4));
SOURCE.start(buffer);

WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:21892")
.method(HttpMethod.POST)
.path("/opentelemetry.proto.collector.logs.v1.LogsService/Export")
.contentType(MediaType.JSON_UTF_8)
.build(),
HttpData.copyOf(JsonFormat.printer().print(LOGS_REQUEST).getBytes()))
.aggregate()
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.REQUEST_ENTITY_TOO_LARGE, throwable))
.join();
}

static class BufferExceptionToStatusArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ public void start(Buffer<Record<ExportMetricsServiceRequest>> buffer) {
}

sb.requestTimeoutMillis(oTelMetricsSourceConfig.getRequestTimeoutInMillis());
if(oTelMetricsSourceConfig.getMaxRequestLength() != null) {
sb.maxRequestLength(oTelMetricsSourceConfig.getMaxRequestLength().getBytes());
}

// ACM Cert for SSL takes preference
if (oTelMetricsSourceConfig.isSsl() || oTelMetricsSourceConfig.useAcmCertForSSL()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Size;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.model.configuration.PluginModel;

Expand Down Expand Up @@ -103,6 +104,9 @@ public class OTelMetricsSourceConfig {
@JsonProperty(COMPRESSION)
private CompressionOption compression = CompressionOption.NONE;

@JsonProperty("max_request_length")
private ByteCount maxRequestLength;

@AssertTrue(message = "path should start with /")
boolean isPathValid() {
return path == null || path.startsWith("/");
Expand Down Expand Up @@ -220,5 +224,9 @@ public boolean isUnauthenticatedHealthCheck() {
public CompressionOption getCompression() {
return compression;
}

public ByteCount getMaxRequestLength() {
return maxRequestLength;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PipelineDescription;
Expand Down Expand Up @@ -979,6 +980,25 @@ void gRPC_request_returns_expected_status_for_exceptions_from_buffer(
assertThat(actualException.getStatus().getCode(), equalTo(expectedStatusCode));
}

@Test
void request_that_exceeds_maxRequestLength_returns_413() throws InvalidProtocolBufferException {
when(oTelMetricsSourceConfig.enableUnframedRequests()).thenReturn(true);
when(oTelMetricsSourceConfig.getMaxRequestLength()).thenReturn(ByteCount.ofBytes(4));
SOURCE.start(buffer);

WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:21891")
.method(HttpMethod.POST)
.path("/opentelemetry.proto.collector.metrics.v1.MetricsService/Export")
.contentType(MediaType.JSON_UTF_8)
.build(),
HttpData.copyOf(JsonFormat.printer().print(METRICS_REQUEST).getBytes()))
.aggregate()
.whenComplete((response, throwable) -> assertSecureResponseWithStatusCode(response, HttpStatus.REQUEST_ENTITY_TOO_LARGE, throwable))
.join();
}

static class BufferExceptionToStatusArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ public void start(Buffer<Record<Object>> buffer) {
}

sb.requestTimeoutMillis(oTelTraceSourceConfig.getRequestTimeoutInMillis());
if(oTelTraceSourceConfig.getMaxRequestLength() != null) {
sb.maxRequestLength(oTelTraceSourceConfig.getMaxRequestLength().getBytes());
}

// ACM Cert for SSL takes preference
if (oTelTraceSourceConfig.isSsl() || oTelTraceSourceConfig.useAcmCertForSSL()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Size;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -103,6 +104,9 @@ public class OTelTraceSourceConfig {
@JsonProperty(COMPRESSION)
private CompressionOption compression = CompressionOption.NONE;

@JsonProperty("max_request_length")
private ByteCount maxRequestLength;

@AssertTrue(message = "path should start with /")
boolean isPathValid() {
return path == null || path.startsWith("/");
Expand Down Expand Up @@ -220,4 +224,8 @@ public boolean isUnauthenticatedHealthCheck() {
public CompressionOption getCompression() {
return compression;
}

public ByteCount getMaxRequestLength() {
return maxRequestLength;
}
}
Loading

0 comments on commit b5f24c3

Please sign in to comment.