diff --git a/plugins/examples/stream-transport-example/BENCHMARK.md b/plugins/examples/stream-transport-example/BENCHMARK.md new file mode 100644 index 0000000000000..6ae8e73a7970e --- /dev/null +++ b/plugins/examples/stream-transport-example/BENCHMARK.md @@ -0,0 +1,168 @@ +# Benchmark API + +The Benchmark API allows you to compare the performance of Stream Transport (Arrow Flight) vs Regular Transport (Netty4) for node-to-node communication in OpenSearch. + +## Package + +`org.opensearch.example.stream.benchmark` + +## Endpoint + +``` +POST /_benchmark/stream +``` + +## Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `rows` | int | 100 | Number of rows per request | +| `columns` | int | 10 | Number of columns per row | +| `avg_column_length` | int | 100 | Average column length in bytes | +| `parallel_requests` | int | 1 | Number of parallel requests to execute | +| `total_requests` | int | 0 | Total requests to send (0 = use parallel_requests only) | +| `batch_size` | int | 100 | Rows per batch for stream transport | +| `use_stream_transport` | boolean | true | Use stream transport (true) or regular transport (false) | + +## Examples + +### Stream Transport (Arrow Flight) + +```bash +curl -X POST "localhost:9200/_benchmark/stream?rows=100000¶llel_requests=10&total_requests=600&batch_size=10000&use_stream_transport=true&pretty&human=true" +``` + +**Response:** +```json +{ + "total_rows" : 60000000, + "total_bytes" : 57970000000, + "total_size" : "53.9gb", + "total_size_bytes" : 57970000000, + "duration_ms" : 47730, + "throughput_rows_per_sec" : "1257071.02", + "throughput_mb_per_sec" : "1158.28", + "latency_ms" : { + "min" : 179, + "max" : 1862, + "avg" : 1578, + "p5" : 1134, + "p10" : 1425, + "p20" : 1481, + "p25" : 1495, + "p35" : 1532, + "p50" : 1622, + "p75" : 1717, + "p90" : 1791, + "p99" : 1838 + }, + "parallel_requests" : 10, + "used_stream_transport" : true +} +``` + +### Regular Transport (Netty4) + +```bash +curl -X POST "localhost:9200/_benchmark/stream?rows=100000¶llel_requests=10&total_requests=600&batch_size=10000&use_stream_transport=false&pretty&human=true" +``` + +**Response:** +```json +{ + "total_rows" : 60000000, + "total_bytes" : 57970000000, + "total_size" : "53.9gb", + "total_size_bytes" : 57970000000, + "duration_ms" : 52100, + "throughput_rows_per_sec" : "1151632.57", + "throughput_mb_per_sec" : "1061.45", + "latency_ms" : { + "min" : 195, + "max" : 2012, + "avg" : 1723, + "p5" : 1245, + "p10" : 1567, + "p20" : 1623, + "p25" : 1645, + "p35" : 1689, + "p50" : 1756, + "p75" : 1834, + "p90" : 1923, + "p99" : 1987 + }, + "parallel_requests" : 10, + "used_stream_transport" : false +} +``` + +## Response Fields + +| Field | Description | +|-------|-------------| +| `total_rows` | Total number of rows processed | +| `total_bytes` | Total bytes transferred | +| `total_size` | Human-readable size (with `human=true`) | +| `duration_ms` | Total duration in milliseconds | +| `throughput_rows_per_sec` | Rows processed per second | +| `throughput_mb_per_sec` | Megabytes transferred per second | +| `latency_ms.min` | Minimum request latency | +| `latency_ms.max` | Maximum request latency | +| `latency_ms.avg` | Average request latency | +| `latency_ms.p5` - `latency_ms.p99` | Latency percentiles | +| `parallel_requests` | Number of parallel requests used | +| `used_stream_transport` | Whether stream transport was used | + +## Use Cases + +### 1. Compare Transport Performance + +```bash +# Stream transport +curl -X POST "localhost:9200/_benchmark/stream?rows=10000¶llel_requests=10&use_stream_transport=true&pretty" + +# Regular transport +curl -X POST "localhost:9200/_benchmark/stream?rows=10000¶llel_requests=10&use_stream_transport=false&pretty" +``` + +### 2. Test High Throughput + +```bash +curl -X POST "localhost:9200/_benchmark/stream?rows=100000¶llel_requests=50&total_requests=1000&batch_size=10000&use_stream_transport=true&pretty&human=true" +``` + +### 3. Test Large Payloads + +```bash +curl -X POST "localhost:9200/_benchmark/stream?rows=50000&columns=50&avg_column_length=1000¶llel_requests=5&use_stream_transport=true&pretty&human=true" +``` + +### 4. Latency Analysis + +```bash +curl -X POST "localhost:9200/_benchmark/stream?rows=1000¶llel_requests=100&total_requests=1000&use_stream_transport=true&pretty" | jq '.latency_ms' +``` + +## Performance Tips + +1. **Parallel Requests**: The API maintains a cushion of 2x `parallel_requests` in-flight to maximize throughput +2. **Batch Size**: For stream transport, larger batch sizes (10000+) reduce overhead +3. **Total Requests**: Use `total_requests` to send more requests than `parallel_requests` for sustained load testing +4. **Network**: Run benchmarks on nodes with good network connectivity for accurate results + +## Implementation Details + +- **Batching**: Maintains 2x parallel_requests in-flight using lock-free refill mechanism +- **Thread Pools**: Uses dedicated `benchmark` and `benchmark_response` thread pools +- **Data Generation**: Generates synthetic data with non-compressible patterns +- **Latency Tracking**: Measures end-to-end latency including all batches for stream transport + +## Integration Tests + +See `BenchmarkStreamIT.java` for comprehensive test examples covering: +- Basic benchmarks +- Parallel requests +- Stream vs regular transport comparison +- Large payloads +- Custom batch sizes +- Latency percentile validation diff --git a/plugins/examples/stream-transport-example/README.md b/plugins/examples/stream-transport-example/README.md index c86c338cb6428..7095e0d771a9a 100644 --- a/plugins/examples/stream-transport-example/README.md +++ b/plugins/examples/stream-transport-example/README.md @@ -1,6 +1,23 @@ # Stream Transport Example -Step-by-step guide to implement streaming transport actions in OpenSearch. +Example plugin demonstrating streaming transport actions and transport performance benchmarking. + +## Benchmark API + +Compare Flight (stream) vs Netty4 (regular) transport performance. See [BENCHMARK.md](BENCHMARK.md). + +```bash +# Run tests +./gradlew :example-plugins:stream-transport-example:internalClusterTest --tests "org.opensearch.example.stream.BenchmarkStreamIT" + +# Compare transports +curl -X POST "localhost:9200/_benchmark/stream?rows=1000¶llel_requests=10&use_stream_transport=true" +curl -X POST "localhost:9200/_benchmark/stream?rows=1000¶llel_requests=10&use_stream_transport=false" +``` + +--- + +## Streaming Transport Guide ## Step 1: Create Action Definition diff --git a/plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/StreamTransportExampleIT.java b/plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/StreamTransportExampleIT.java index 02a725dc4f731..3b7c1b5a1713c 100644 --- a/plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/StreamTransportExampleIT.java +++ b/plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/StreamTransportExampleIT.java @@ -72,7 +72,7 @@ public void handleException(TransportException exp) { @Override public String executor() { - return ThreadPool.Names.SAME; + return ThreadPool.Names.GENERIC; } @Override diff --git a/plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/benchmark/BenchmarkStreamIT.java b/plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/benchmark/BenchmarkStreamIT.java new file mode 100644 index 0000000000000..eeeecd67d2726 --- /dev/null +++ b/plugins/examples/stream-transport-example/src/internalClusterTest/java/org/opensearch/example/stream/benchmark/BenchmarkStreamIT.java @@ -0,0 +1,175 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.example.stream.benchmark; + +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.arrow.flight.transport.FlightStreamPlugin; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.example.stream.StreamTransportExamplePlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.OpenSearchTestCase.LockFeatureFlag; + +import java.util.Collection; +import java.util.List; + +import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2) +public class BenchmarkStreamIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(StreamTransportExamplePlugin.class, FlightStreamPlugin.class); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testBasicBenchmark() { + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForGreenStatus().get(); + assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + + BenchmarkStreamRequest request = new BenchmarkStreamRequest(); + request.setRows(10); + request.setColumns(5); + request.setAvgColumnLength(100); + request.setParallelRequests(1); + request.setUseStreamTransport(true); + + BenchmarkStreamResponse response = client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet(); + + assertThat(response.getTotalRows(), equalTo(10L)); + assertThat(response.getTotalBytes(), greaterThan(0L)); + assertThat(response.getDurationMs(), greaterThan(0L)); + assertThat(response.getThroughputRowsPerSec(), greaterThan(0.0)); + assertThat(response.getMinLatencyMs(), greaterThan(0L)); + assertThat(response.getMaxLatencyMs(), greaterThanOrEqualTo(response.getMinLatencyMs())); + assertThat(response.getParallelRequests(), equalTo(1)); + assertTrue(response.isUsedStreamTransport()); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testParallelRequests() { + BenchmarkStreamRequest request = new BenchmarkStreamRequest(); + request.setRows(50); + request.setColumns(10); + request.setParallelRequests(5); + request.setUseStreamTransport(true); + + BenchmarkStreamResponse response = client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet(); + + assertThat(response.getTotalRows(), equalTo(250L)); // 50 rows × 5 parallel + assertThat(response.getParallelRequests(), equalTo(5)); + assertThat(response.getP90LatencyMs(), greaterThan(0L)); + assertThat(response.getP99LatencyMs(), greaterThanOrEqualTo(response.getP90LatencyMs())); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testStreamVsRegularTransport() { + // Test with stream transport + BenchmarkStreamRequest streamRequest = new BenchmarkStreamRequest(); + streamRequest.setRows(100); + streamRequest.setColumns(10); + streamRequest.setParallelRequests(2); + streamRequest.setUseStreamTransport(true); + + BenchmarkStreamResponse streamResponse = client().execute(BenchmarkStreamAction.INSTANCE, streamRequest).actionGet(); + assertTrue(streamResponse.isUsedStreamTransport()); + assertThat(streamResponse.getTotalRows(), equalTo(200L)); + + // Test with regular transport + BenchmarkStreamRequest regularRequest = new BenchmarkStreamRequest(); + regularRequest.setRows(100); + regularRequest.setColumns(10); + regularRequest.setParallelRequests(2); + regularRequest.setUseStreamTransport(false); + + BenchmarkStreamResponse regularResponse = client().execute(BenchmarkStreamAction.INSTANCE, regularRequest).actionGet(); + assertFalse(regularResponse.isUsedStreamTransport()); + assertThat(regularResponse.getTotalRows(), equalTo(200L)); + + // Both should produce results + assertThat(streamResponse.getThroughputRowsPerSec(), greaterThan(0.0)); + assertThat(regularResponse.getThroughputRowsPerSec(), greaterThan(0.0)); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testLargePayload() { + BenchmarkStreamRequest request = new BenchmarkStreamRequest(); + request.setRows(1000); + request.setColumns(20); + request.setAvgColumnLength(500); + request.setParallelRequests(3); + + BenchmarkStreamResponse response = client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet(); + + assertThat(response.getTotalRows(), equalTo(3000L)); + assertThat(response.getTotalBytes(), greaterThan(1_000_000L)); // > 1MB + assertThat(response.getThroughputMbPerSec(), greaterThan(0.0)); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testValidation() { + BenchmarkStreamRequest request = new BenchmarkStreamRequest(); + request.setRows(-1); + + try { + client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet(); + fail("Expected validation exception"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("rows must be > 0")); + } + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testThreadPoolConfiguration() { + BenchmarkStreamRequest request = new BenchmarkStreamRequest(); + request.setRows(10); + request.setThreadPool("generic"); + request.setParallelRequests(1); + + BenchmarkStreamResponse response = client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet(); + assertThat(response.getTotalRows(), equalTo(10L)); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testLatencyPercentiles() { + BenchmarkStreamRequest request = new BenchmarkStreamRequest(); + request.setRows(50); + request.setParallelRequests(10); + + BenchmarkStreamResponse response = client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet(); + + // Verify percentile ordering + assertThat(response.getMinLatencyMs(), lessThanOrEqualTo(response.getAvgLatencyMs())); + assertThat(response.getAvgLatencyMs(), lessThanOrEqualTo(response.getP90LatencyMs())); + assertThat(response.getP90LatencyMs(), lessThanOrEqualTo(response.getP99LatencyMs())); + assertThat(response.getP99LatencyMs(), lessThanOrEqualTo(response.getMaxLatencyMs())); + } + + @LockFeatureFlag(STREAM_TRANSPORT) + public void testCustomBatchSize() { + BenchmarkStreamRequest smallBatchRequest = new BenchmarkStreamRequest(); + smallBatchRequest.setRows(100); + smallBatchRequest.setBatchSize(10); + smallBatchRequest.setUseStreamTransport(true); + + BenchmarkStreamResponse smallBatchResponse = client().execute(BenchmarkStreamAction.INSTANCE, smallBatchRequest).actionGet(); + assertThat(smallBatchResponse.getTotalRows(), equalTo(100L)); + assertTrue(smallBatchResponse.isUsedStreamTransport()); + + BenchmarkStreamRequest largeBatchRequest = new BenchmarkStreamRequest(); + largeBatchRequest.setRows(100); + largeBatchRequest.setBatchSize(500); + largeBatchRequest.setUseStreamTransport(true); + + BenchmarkStreamResponse largeBatchResponse = client().execute(BenchmarkStreamAction.INSTANCE, largeBatchRequest).actionGet(); + assertThat(largeBatchResponse.getTotalRows(), equalTo(100L)); + assertTrue(largeBatchResponse.isUsedStreamTransport()); + } +} diff --git a/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/StreamTransportExamplePlugin.java b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/StreamTransportExamplePlugin.java index 94ea2d1fa8231..7568e57f3364f 100644 --- a/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/StreamTransportExamplePlugin.java +++ b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/StreamTransportExamplePlugin.java @@ -9,25 +9,86 @@ package org.opensearch.example.stream; import org.opensearch.action.ActionRequest; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; import org.opensearch.core.action.ActionResponse; +import org.opensearch.example.stream.benchmark.BenchmarkStreamAction; +import org.opensearch.example.stream.benchmark.RestBenchmarkStreamAction; +import org.opensearch.example.stream.benchmark.TransportBenchmarkStreamAction; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; +import org.opensearch.threadpool.ExecutorBuilder; +import org.opensearch.threadpool.FixedExecutorBuilder; -import java.util.Collections; +import java.util.Arrays; import java.util.List; +import java.util.function.Supplier; /** * Example plugin demonstrating streaming transport actions */ public class StreamTransportExamplePlugin extends Plugin implements ActionPlugin { + /** Benchmark thread pool name */ + public static final String BENCHMARK_THREAD_POOL_NAME = "benchmark"; + /** Benchmark response handler thread pool name */ + public static final String BENCHMARK_RESPONSE_POOL_NAME = "benchmark_response"; + /** * Constructor */ public StreamTransportExamplePlugin() {} + @Override + public List> getExecutorBuilders(Settings settings) { + int processors = Runtime.getRuntime().availableProcessors(); + int benchmarkSize = settings.getAsInt("thread_pool." + BENCHMARK_THREAD_POOL_NAME + ".size", processors * 8); + int benchmarkQueue = settings.getAsInt("thread_pool." + BENCHMARK_THREAD_POOL_NAME + ".queue_size", 1000); + int responseSize = settings.getAsInt("thread_pool." + BENCHMARK_RESPONSE_POOL_NAME + ".size", processors * 8); + int responseQueue = settings.getAsInt("thread_pool." + BENCHMARK_RESPONSE_POOL_NAME + ".queue_size", 1000); + + return Arrays.asList( + new FixedExecutorBuilder( + settings, + BENCHMARK_THREAD_POOL_NAME, + benchmarkSize, + benchmarkQueue, + "thread_pool." + BENCHMARK_THREAD_POOL_NAME + ), + new FixedExecutorBuilder( + settings, + BENCHMARK_RESPONSE_POOL_NAME, + responseSize, + responseQueue, + "thread_pool." + BENCHMARK_RESPONSE_POOL_NAME + ) + ); + } + @Override public List> getActions() { - return Collections.singletonList(new ActionHandler<>(StreamDataAction.INSTANCE, TransportStreamDataAction.class)); + return Arrays.asList( + new ActionHandler<>(StreamDataAction.INSTANCE, TransportStreamDataAction.class), + new ActionHandler<>(BenchmarkStreamAction.INSTANCE, TransportBenchmarkStreamAction.class) + ); + } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return List.of(new RestBenchmarkStreamAction()); } } diff --git a/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/TransportStreamDataAction.java b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/TransportStreamDataAction.java index d31e78477f3da..4225abfd8dd6a 100644 --- a/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/TransportStreamDataAction.java +++ b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/TransportStreamDataAction.java @@ -12,12 +12,14 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.TransportAction; +import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.StreamTransportService; import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportService; import org.opensearch.transport.stream.StreamErrorCode; import org.opensearch.transport.stream.StreamException; @@ -31,21 +33,32 @@ public class TransportStreamDataAction extends TransportAction { + /** Singleton instance */ + public static final BenchmarkStreamAction INSTANCE = new BenchmarkStreamAction(); + /** Action name */ + public static final String NAME = "cluster:admin/benchmark_stream"; + + private BenchmarkStreamAction() { + super(NAME, BenchmarkStreamResponse::new); + } +} diff --git a/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/BenchmarkStreamRequest.java b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/BenchmarkStreamRequest.java new file mode 100644 index 0000000000000..da2ca0caca9ed --- /dev/null +++ b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/BenchmarkStreamRequest.java @@ -0,0 +1,172 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.example.stream.benchmark; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request for benchmark stream action + */ +public class BenchmarkStreamRequest extends ActionRequest { + + private int rows = 100; + private int columns = 10; + private int avgColumnLength = 100; + private String columnType = "string"; + private boolean useStreamTransport = true; + private int parallelRequests = 1; + private int totalRequests = 0; + private int targetTps = 0; + private String threadPool = "generic"; + private int batchSize = 100; + + /** Constructor */ + public BenchmarkStreamRequest() {} + + /** + * Constructor from stream input + * @param in stream input + * @throws IOException if an I/O error occurs + */ + public BenchmarkStreamRequest(StreamInput in) throws IOException { + super(in); + rows = in.readVInt(); + columns = in.readVInt(); + avgColumnLength = in.readVInt(); + columnType = in.readString(); + useStreamTransport = in.readBoolean(); + parallelRequests = in.readVInt(); + totalRequests = in.readVInt(); + targetTps = in.readVInt(); + threadPool = in.readString(); + batchSize = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(rows); + out.writeVInt(columns); + out.writeVInt(avgColumnLength); + out.writeString(columnType); + out.writeBoolean(useStreamTransport); + out.writeVInt(parallelRequests); + out.writeVInt(totalRequests); + out.writeVInt(targetTps); + out.writeString(threadPool); + out.writeVInt(batchSize); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (rows <= 0) { + validationException = addValidationError("rows must be > 0", validationException); + } + if (columns <= 0) { + validationException = addValidationError("columns must be > 0", validationException); + } + if (parallelRequests <= 0) { + validationException = addValidationError("parallel_requests must be > 0", validationException); + } + if (batchSize <= 0) { + validationException = addValidationError("batch_size must be > 0", validationException); + } + return validationException; + } + + private ActionRequestValidationException addValidationError(String error, ActionRequestValidationException exception) { + if (exception == null) { + exception = new ActionRequestValidationException(); + } + exception.addValidationError(error); + return exception; + } + + int getRows() { + return rows; + } + + void setRows(int rows) { + this.rows = rows; + } + + int getColumns() { + return columns; + } + + void setColumns(int columns) { + this.columns = columns; + } + + int getAvgColumnLength() { + return avgColumnLength; + } + + void setAvgColumnLength(int avgColumnLength) { + this.avgColumnLength = avgColumnLength; + } + + String getColumnType() { + return columnType; + } + + void setColumnType(String columnType) { + this.columnType = columnType; + } + + boolean isUseStreamTransport() { + return useStreamTransport; + } + + void setUseStreamTransport(boolean useStreamTransport) { + this.useStreamTransport = useStreamTransport; + } + + int getParallelRequests() { + return parallelRequests; + } + + void setParallelRequests(int parallelRequests) { + this.parallelRequests = parallelRequests; + } + + int getTotalRequests() { + return totalRequests; + } + + void setTotalRequests(int totalRequests) { + this.totalRequests = totalRequests; + } + + int getTargetTps() { + return targetTps; + } + + void setTargetTps(int targetTps) { + this.targetTps = targetTps; + } + + String getThreadPool() { + return threadPool; + } + + void setThreadPool(String threadPool) { + this.threadPool = threadPool; + } + + int getBatchSize() { + return batchSize; + } + + void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } +} diff --git a/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/BenchmarkStreamResponse.java b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/BenchmarkStreamResponse.java new file mode 100644 index 0000000000000..220ffc2d864c1 --- /dev/null +++ b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/BenchmarkStreamResponse.java @@ -0,0 +1,515 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.example.stream.benchmark; + +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Response for benchmark stream action + */ +public class BenchmarkStreamResponse extends ActionResponse implements ToXContentObject { + + private long totalRows; + private long totalBytes; + private long durationMs; + private double throughputRowsPerSec; + private double throughputMbPerSec; + private long minLatencyMs; + private long maxLatencyMs; + private long avgLatencyMs; + private long p5LatencyMs; + private long p10LatencyMs; + private long p20LatencyMs; + private long p25LatencyMs; + private long p35LatencyMs; + private long p50LatencyMs; + private long p75LatencyMs; + private long p90LatencyMs; + private long p99LatencyMs; + private int parallelRequests; + private boolean usedStreamTransport; + private ThreadPoolStats threadPoolStats; + private java.util.Map flightTiming; + private byte[] payload; + + /** + * Constructor from stream input + * @param in stream input + * @throws IOException if an I/O error occurs + */ + public BenchmarkStreamResponse(StreamInput in) throws IOException { + super(in); + totalRows = in.readVLong(); + totalBytes = in.readVLong(); + durationMs = in.readVLong(); + throughputRowsPerSec = in.readDouble(); + throughputMbPerSec = in.readDouble(); + minLatencyMs = in.readVLong(); + maxLatencyMs = in.readVLong(); + avgLatencyMs = in.readVLong(); + p5LatencyMs = in.readVLong(); + p10LatencyMs = in.readVLong(); + p20LatencyMs = in.readVLong(); + p25LatencyMs = in.readVLong(); + p35LatencyMs = in.readVLong(); + p50LatencyMs = in.readVLong(); + p75LatencyMs = in.readVLong(); + p90LatencyMs = in.readVLong(); + p99LatencyMs = in.readVLong(); + parallelRequests = in.readVInt(); + usedStreamTransport = in.readBoolean(); + if (in.readBoolean()) { + threadPoolStats = new ThreadPoolStats(in); + } + if (in.readBoolean()) { + flightTiming = in.readMap(); + } + if (in.readBoolean()) { + payload = in.readByteArray(); + } + } + + /** + * Constructor with all fields + * @param totalRows total rows + * @param totalBytes total bytes + * @param durationMs duration in milliseconds + * @param throughputRowsPerSec throughput in rows per second + * @param throughputMbPerSec throughput in MB per second + * @param minLatencyMs minimum latency in milliseconds + * @param maxLatencyMs maximum latency in milliseconds + * @param avgLatencyMs average latency in milliseconds + * @param p5LatencyMs p5 latency in milliseconds + * @param p10LatencyMs p10 latency in milliseconds + * @param p20LatencyMs p20 latency in milliseconds + * @param p25LatencyMs p25 latency in milliseconds + * @param p35LatencyMs p35 latency in milliseconds + * @param p50LatencyMs p50 latency in milliseconds + * @param p75LatencyMs p75 latency in milliseconds + * @param p90LatencyMs p90 latency in milliseconds + * @param p99LatencyMs p99 latency in milliseconds + * @param parallelRequests parallel requests + * @param usedStreamTransport whether stream transport was used + * @param threadPoolStats thread pool statistics + * @param flightTiming flight timing metrics + * @param payload payload data + */ + public BenchmarkStreamResponse( + long totalRows, + long totalBytes, + long durationMs, + double throughputRowsPerSec, + double throughputMbPerSec, + long minLatencyMs, + long maxLatencyMs, + long avgLatencyMs, + long p5LatencyMs, + long p10LatencyMs, + long p20LatencyMs, + long p25LatencyMs, + long p35LatencyMs, + long p50LatencyMs, + long p75LatencyMs, + long p90LatencyMs, + long p99LatencyMs, + int parallelRequests, + boolean usedStreamTransport, + ThreadPoolStats threadPoolStats, + java.util.Map flightTiming, + byte[] payload + ) { + this.totalRows = totalRows; + this.totalBytes = totalBytes; + this.durationMs = durationMs; + this.throughputRowsPerSec = throughputRowsPerSec; + this.throughputMbPerSec = throughputMbPerSec; + this.minLatencyMs = minLatencyMs; + this.maxLatencyMs = maxLatencyMs; + this.avgLatencyMs = avgLatencyMs; + this.p5LatencyMs = p5LatencyMs; + this.p10LatencyMs = p10LatencyMs; + this.p20LatencyMs = p20LatencyMs; + this.p25LatencyMs = p25LatencyMs; + this.p35LatencyMs = p35LatencyMs; + this.p50LatencyMs = p50LatencyMs; + this.p75LatencyMs = p75LatencyMs; + this.p90LatencyMs = p90LatencyMs; + this.p99LatencyMs = p99LatencyMs; + this.parallelRequests = parallelRequests; + this.usedStreamTransport = usedStreamTransport; + this.threadPoolStats = threadPoolStats; + this.flightTiming = flightTiming; + this.payload = payload; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalRows); + out.writeVLong(totalBytes); + out.writeVLong(durationMs); + out.writeDouble(throughputRowsPerSec); + out.writeDouble(throughputMbPerSec); + out.writeVLong(minLatencyMs); + out.writeVLong(maxLatencyMs); + out.writeVLong(avgLatencyMs); + out.writeVLong(p5LatencyMs); + out.writeVLong(p10LatencyMs); + out.writeVLong(p20LatencyMs); + out.writeVLong(p25LatencyMs); + out.writeVLong(p35LatencyMs); + out.writeVLong(p50LatencyMs); + out.writeVLong(p75LatencyMs); + out.writeVLong(p90LatencyMs); + out.writeVLong(p99LatencyMs); + out.writeVInt(parallelRequests); + out.writeBoolean(usedStreamTransport); + out.writeBoolean(threadPoolStats != null); + if (threadPoolStats != null) { + threadPoolStats.writeTo(out); + } + out.writeBoolean(flightTiming != null); + if (flightTiming != null) { + out.writeMap(flightTiming); + } + out.writeBoolean(payload != null); + if (payload != null) { + out.writeByteArray(payload); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject() + .field("total_rows", totalRows) + .field("total_bytes", totalBytes) + .humanReadableField("total_size_bytes", "total_size", new ByteSizeValue(totalBytes)) + .field("duration_ms", durationMs) + .field("throughput_rows_per_sec", String.format(java.util.Locale.ROOT, "%.2f", throughputRowsPerSec)) + .field("throughput_mb_per_sec", String.format(java.util.Locale.ROOT, "%.2f", throughputMbPerSec)) + .startObject("latency_ms") + .field("min", minLatencyMs) + .field("max", maxLatencyMs) + .field("avg", avgLatencyMs) + .field("p5", p5LatencyMs) + .field("p10", p10LatencyMs) + .field("p20", p20LatencyMs) + .field("p25", p25LatencyMs) + .field("p35", p35LatencyMs) + .field("p50", p50LatencyMs) + .field("p75", p75LatencyMs) + .field("p90", p90LatencyMs) + .field("p99", p99LatencyMs) + .endObject() + .field("parallel_requests", parallelRequests) + .field("used_stream_transport", usedStreamTransport); + if (threadPoolStats != null) { + builder.field("thread_pool_stats", threadPoolStats); + } + if (flightTiming != null && !flightTiming.isEmpty()) { + builder.field("flight_timing", flightTiming); + } + return builder.endObject(); + } + + /** + * Get total rows + * @return total rows + */ + public long getTotalRows() { + return totalRows; + } + + /** + * Get total bytes + * @return total bytes + */ + public long getTotalBytes() { + return totalBytes; + } + + /** + * Get duration in milliseconds + * @return duration in milliseconds + */ + public long getDurationMs() { + return durationMs; + } + + /** + * Get throughput in rows per second + * @return throughput in rows per second + */ + public double getThroughputRowsPerSec() { + return throughputRowsPerSec; + } + + /** + * Get throughput in MB per second + * @return throughput in MB per second + */ + public double getThroughputMbPerSec() { + return throughputMbPerSec; + } + + /** + * Get minimum latency in milliseconds + * @return minimum latency in milliseconds + */ + public long getMinLatencyMs() { + return minLatencyMs; + } + + /** + * Get maximum latency in milliseconds + * @return maximum latency in milliseconds + */ + public long getMaxLatencyMs() { + return maxLatencyMs; + } + + /** + * Get average latency in milliseconds + * @return average latency in milliseconds + */ + public long getAvgLatencyMs() { + return avgLatencyMs; + } + + /** + * Get p5 latency in milliseconds + * @return p5 latency in milliseconds + */ + public long getP5LatencyMs() { + return p5LatencyMs; + } + + /** + * Get p10 latency in milliseconds + * @return p10 latency in milliseconds + */ + public long getP10LatencyMs() { + return p10LatencyMs; + } + + /** + * Get p20 latency in milliseconds + * @return p20 latency in milliseconds + */ + public long getP20LatencyMs() { + return p20LatencyMs; + } + + /** + * Get p25 latency in milliseconds + * @return p25 latency in milliseconds + */ + public long getP25LatencyMs() { + return p25LatencyMs; + } + + /** + * Get p35 latency in milliseconds + * @return p35 latency in milliseconds + */ + public long getP35LatencyMs() { + return p35LatencyMs; + } + + /** + * Get p50 latency in milliseconds + * @return p50 latency in milliseconds + */ + public long getP50LatencyMs() { + return p50LatencyMs; + } + + /** + * Get p75 latency in milliseconds + * @return p75 latency in milliseconds + */ + public long getP75LatencyMs() { + return p75LatencyMs; + } + + /** + * Get p90 latency in milliseconds + * @return p90 latency in milliseconds + */ + public long getP90LatencyMs() { + return p90LatencyMs; + } + + /** + * Get p99 latency in milliseconds + * @return p99 latency in milliseconds + */ + public long getP99LatencyMs() { + return p99LatencyMs; + } + + /** + * Get parallel requests + * @return parallel requests + */ + public int getParallelRequests() { + return parallelRequests; + } + + /** + * Check if stream transport was used + * @return whether stream transport was used + */ + public boolean isUsedStreamTransport() { + return usedStreamTransport; + } + + /** + * Get flight timing metrics + * @return flight timing metrics + */ + public java.util.Map getFlightTiming() { + return flightTiming; + } + + /** + * Thread pool statistics + */ + public static class ThreadPoolStats implements ToXContentObject { + private final java.util.List pools; + + /** + * Constructor + * @param pools list of pool statistics + */ + public ThreadPoolStats(java.util.List pools) { + this.pools = pools; + } + + /** + * Constructor from stream input + * @param in stream input + * @throws IOException if an I/O error occurs + */ + public ThreadPoolStats(StreamInput in) throws IOException { + this.pools = in.readList(PoolStat::new); + } + + /** + * Write to stream output + * @param out stream output + * @throws IOException if an I/O error occurs + */ + public void writeTo(StreamOutput out) throws IOException { + out.writeList(pools); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + for (PoolStat pool : pools) { + builder.field(pool.name, pool); + } + return builder.endObject(); + } + } + + /** + * Individual pool statistics + */ + public static class PoolStat implements ToXContentObject, org.opensearch.core.common.io.stream.Writeable { + private final String name; + private final long queueSizeDiff; + private final long completedDiff; + private final int maxActive; + private final long waitTimeMs; + private final int currentActive; + private final int currentQueue; + private final int[] eventLoopPending; + + /** + * Constructor + * @param name pool name + * @param queueSizeDiff queue size difference + * @param completedDiff completed tasks difference + * @param maxActive maximum active threads + * @param waitTimeMs total wait time in milliseconds + * @param currentActive current active threads + * @param currentQueue current queue size + * @param eventLoopPending event loop pending tasks (for flight transport) + */ + public PoolStat( + String name, + long queueSizeDiff, + long completedDiff, + int maxActive, + long waitTimeMs, + int currentActive, + int currentQueue, + int[] eventLoopPending + ) { + this.name = name; + this.queueSizeDiff = queueSizeDiff; + this.completedDiff = completedDiff; + this.maxActive = maxActive; + this.waitTimeMs = waitTimeMs; + this.currentActive = currentActive; + this.currentQueue = currentQueue; + this.eventLoopPending = eventLoopPending; + } + + /** + * Constructor from stream input + * @param in stream input + * @throws IOException if an I/O error occurs + */ + public PoolStat(StreamInput in) throws IOException { + this.name = in.readString(); + this.queueSizeDiff = in.readVLong(); + this.completedDiff = in.readVLong(); + this.maxActive = in.readVInt(); + this.waitTimeMs = in.readVLong(); + this.currentActive = in.readVInt(); + this.currentQueue = in.readVInt(); + this.eventLoopPending = in.readBoolean() ? in.readIntArray() : null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeVLong(queueSizeDiff); + out.writeVLong(completedDiff); + out.writeVInt(maxActive); + out.writeVLong(waitTimeMs); + out.writeVInt(currentActive); + out.writeVInt(currentQueue); + out.writeBoolean(eventLoopPending != null); + if (eventLoopPending != null) { + out.writeIntArray(eventLoopPending); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject() + .field("queue_size_diff", queueSizeDiff) + .field("completed_diff", completedDiff) + .field("max_active", maxActive) + .field("wait_time_ms", waitTimeMs) + .field("current_active", currentActive) + .field("current_queue", currentQueue); + if (eventLoopPending != null) { + builder.field("event_loop_pending", eventLoopPending); + } + return builder.endObject(); + } + } +} diff --git a/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/RestBenchmarkStreamAction.java b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/RestBenchmarkStreamAction.java new file mode 100644 index 0000000000000..feaca26b15795 --- /dev/null +++ b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/RestBenchmarkStreamAction.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.example.stream.benchmark; + +import org.opensearch.example.stream.StreamTransportExamplePlugin; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; +import org.opensearch.transport.client.node.NodeClient; + +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.POST; + +/** + * REST handler for benchmark stream action + */ +public class RestBenchmarkStreamAction extends BaseRestHandler { + + /** Constructor */ + public RestBenchmarkStreamAction() {} + + @Override + public String getName() { + return "benchmark_stream_action"; + } + + @Override + public List routes() { + return List.of(new Route(POST, "/_benchmark/stream")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + BenchmarkStreamRequest benchmarkRequest = new BenchmarkStreamRequest(); + + benchmarkRequest.setRows(request.paramAsInt("rows", 100)); + benchmarkRequest.setColumns(request.paramAsInt("columns", 10)); + benchmarkRequest.setAvgColumnLength(request.paramAsInt("avg_column_length", 100)); + benchmarkRequest.setColumnType(request.param("column_type", "string")); + benchmarkRequest.setUseStreamTransport(request.paramAsBoolean("use_stream_transport", true)); + benchmarkRequest.setParallelRequests(request.paramAsInt("parallel_requests", 1)); + benchmarkRequest.setTotalRequests(request.paramAsInt("total_requests", 0)); + benchmarkRequest.setTargetTps(request.paramAsInt("target_tps", 0)); + benchmarkRequest.setThreadPool(request.param("thread_pool", StreamTransportExamplePlugin.BENCHMARK_THREAD_POOL_NAME)); + benchmarkRequest.setBatchSize(request.paramAsInt("batch_size", 100)); + + return channel -> client.execute(BenchmarkStreamAction.INSTANCE, benchmarkRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/TransportBenchmarkStreamAction.java b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/TransportBenchmarkStreamAction.java new file mode 100644 index 0000000000000..6e0eef56616f1 --- /dev/null +++ b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/TransportBenchmarkStreamAction.java @@ -0,0 +1,550 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.example.stream.benchmark; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.TransportAction; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.example.stream.StreamTransportExamplePlugin; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.ThreadPoolStats.Stats; +import org.opensearch.transport.StreamTransportResponseHandler; +import org.opensearch.transport.StreamTransportService; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportRequestOptions; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; +import org.opensearch.transport.stream.StreamTransportResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Transport action for benchmarking stream transport performance + */ +public class TransportBenchmarkStreamAction extends TransportAction { + + private static final Logger logger = LogManager.getLogger(TransportBenchmarkStreamAction.class); + private static final String SHARD_ACTION_NAME = BenchmarkStreamAction.NAME + "[s]"; + + private static final double BYTES_TO_MB = 1024.0 * 1024.0; + private static final double MS_TO_SEC = 1000.0; + + private final ClusterService clusterService; + private final ThreadPool threadPool; + private final TransportService transportService; + private final StreamTransportService streamTransportService; + + /** + * Constructor + * @param clusterService cluster service + * @param transportService transport service + * @param streamTransportService stream transport service + * @param actionFilters action filters + * @param threadPool thread pool + */ + @Inject + public TransportBenchmarkStreamAction( + ClusterService clusterService, + TransportService transportService, + @Nullable StreamTransportService streamTransportService, + ActionFilters actionFilters, + ThreadPool threadPool + ) { + super(BenchmarkStreamAction.NAME, actionFilters, transportService.getTaskManager()); + this.clusterService = clusterService; + this.threadPool = threadPool; + this.transportService = transportService; + this.streamTransportService = streamTransportService; + + // Register shard-level handler on regular transport + transportService.registerRequestHandler( + SHARD_ACTION_NAME, + StreamTransportExamplePlugin.BENCHMARK_THREAD_POOL_NAME, + false, + false, + BenchmarkStreamRequest::new, + (request, channel, task) -> handleRegularTransportRequest(request, channel) + ); + + // Register handler on stream transport if available + if (streamTransportService != null) { + streamTransportService.registerRequestHandler( + SHARD_ACTION_NAME, + StreamTransportExamplePlugin.BENCHMARK_THREAD_POOL_NAME, + BenchmarkStreamRequest::new, + (request, channel, task) -> handleStreamTransportRequest(request, channel) + ); + } + } + + @Override + protected void doExecute(Task task, BenchmarkStreamRequest request, ActionListener listener) { + threadPool.executor(StreamTransportExamplePlugin.BENCHMARK_THREAD_POOL_NAME).execute(() -> { + try { + BenchmarkStreamResponse response = executeBenchmark(request); + listener.onResponse(response); + } catch (Exception e) { + listener.onFailure(e); + } + }); + } + + private BenchmarkStreamResponse executeBenchmark(BenchmarkStreamRequest request) throws Exception { + DiscoveryNode[] dataNodes = getAvailableNodes(); + int totalRequests = request.getTotalRequests() > 0 ? request.getTotalRequests() : request.getParallelRequests(); + int maxInFlight = request.getParallelRequests() * 2; + + BenchmarkContext ctx = new BenchmarkContext( + totalRequests, + maxInFlight, + dataNodes, + request.isUseStreamTransport() && streamTransportService != null + ); + + long startTime = System.currentTimeMillis(); + + // Send initial batch up to 2x parallel limit + int initialBatch = Math.min(maxInFlight, totalRequests); + for (int i = 0; i < initialBatch; i++) { + sendRequest(ctx, request, i); + } + + ctx.latch.await(); + long durationMs = System.currentTimeMillis() - startTime; + + return calculateStats( + ctx.totalRows.get(), + ctx.totalBytes.get(), + durationMs, + ctx.latencies, + request.getParallelRequests(), + ctx.useStream + ); + } + + private void sendRequest(BenchmarkContext ctx, BenchmarkStreamRequest request, int index) { + DiscoveryNode targetNode = ctx.dataNodes[index % ctx.dataNodes.length]; + ctx.activeRequests.incrementAndGet(); + ctx.sentRequests.incrementAndGet(); + + long requestStart = System.nanoTime(); + Runnable onComplete = () -> { + ctx.activeRequests.decrementAndGet(); + tryRefill(ctx, request); + }; + + if (ctx.useStream) { + sendStreamRequest(targetNode, request, requestStart, ctx, onComplete); + } else { + sendRegularRequest(targetNode, request, requestStart, ctx, onComplete); + } + } + + private void tryRefill(BenchmarkContext ctx, BenchmarkStreamRequest request) { + if (ctx.refilling.compareAndSet(0, 1)) { + try { + while (ctx.sentRequests.get() < ctx.totalRequests && ctx.activeRequests.get() < ctx.maxInFlight) { + long nextIndex = ctx.sentRequests.getAndIncrement(); + if (nextIndex < ctx.totalRequests) { + DiscoveryNode node = ctx.dataNodes[(int) (nextIndex % ctx.dataNodes.length)]; + ctx.activeRequests.incrementAndGet(); + long start = System.nanoTime(); + Runnable onComplete = () -> { + ctx.activeRequests.decrementAndGet(); + tryRefill(ctx, request); + }; + if (ctx.useStream) { + sendStreamRequest(node, request, start, ctx, onComplete); + } else { + sendRegularRequest(node, request, start, ctx, onComplete); + } + } else { + break; + } + } + } finally { + ctx.refilling.set(0); + } + } + } + + private DiscoveryNode[] getAvailableNodes() { + DiscoveryNode[] dataNodes = clusterService.state().nodes().getDataNodes().values().toArray(DiscoveryNode[]::new); + if (dataNodes.length == 0) { + dataNodes = clusterService.state().nodes().getNodes().values().toArray(DiscoveryNode[]::new); + } + if (dataNodes.length == 0) { + throw new IllegalStateException("No nodes available"); + } + return dataNodes; + } + + private void handleRegularTransportRequest(BenchmarkStreamRequest request, TransportChannel channel) { + try { + long bytes = calculateTotalBytes(request); + byte[] payload = generateSyntheticData(bytes); + channel.sendResponse(new BenchmarkDataResponse(payload)); + } catch (Exception e) { + try { + channel.sendResponse(e); + } catch (IOException ioException) { + logger.error("Failed to send error response", ioException); + } + } + } + + private void handleStreamTransportRequest(BenchmarkStreamRequest request, TransportChannel channel) { + try { + int totalRows = request.getRows(); + int batchSize = request.getBatchSize(); + long bytesPerRow = calculateBytesPerRow(request); + + for (int rowsSent = 0; rowsSent < totalRows; rowsSent += batchSize) { + int rowsInBatch = Math.min(batchSize, totalRows - rowsSent); + long bytesInBatch = rowsInBatch * bytesPerRow; + + byte[] payload = generateSyntheticData(bytesInBatch); + BenchmarkDataResponse response = new BenchmarkDataResponse(payload); + channel.sendResponseBatch(response); + } + channel.completeStream(); + } catch (Exception e) { + try { + channel.sendResponse(e); + } catch (IOException ioException) { + logger.error("Failed to send error response", ioException); + } + } + } + + private void sendStreamRequest( + DiscoveryNode targetNode, + BenchmarkStreamRequest request, + long requestStart, + BenchmarkContext ctx, + Runnable onComplete + ) { + try { + streamTransportService.sendRequest( + targetNode, + SHARD_ACTION_NAME, + request, + TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STREAM).build(), + new StreamTransportResponseHandler() { + @Override + public void handleStreamResponse(StreamTransportResponse streamResponse) { + try { + BenchmarkDataResponse response; + while ((response = streamResponse.nextResponse()) != null) { + ctx.totalBytes.addAndGet(response.getPayloadSize()); + } + ctx.totalRows.addAndGet(request.getRows()); + ctx.latencies.add(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - requestStart)); + streamResponse.close(); + } catch (Exception e) { + logger.error("Error processing stream response", e); + streamResponse.cancel("Error", e); + } finally { + ctx.latch.countDown(); + onComplete.run(); + } + } + + @Override + public void handleException(TransportException exp) { + logger.warn("Stream transport request failed: {}", exp.getMessage()); + ctx.latch.countDown(); + onComplete.run(); + } + + @Override + public String executor() { + return StreamTransportExamplePlugin.BENCHMARK_RESPONSE_POOL_NAME; + } + + @Override + public BenchmarkDataResponse read(StreamInput in) throws IOException { + return new BenchmarkDataResponse(in); + } + } + ); + } catch (Exception e) { + logger.warn("Failed to send stream request: {}", e.getMessage()); + ctx.latch.countDown(); + } + } + + private void sendRegularRequest( + DiscoveryNode targetNode, + BenchmarkStreamRequest request, + long requestStart, + BenchmarkContext ctx, + Runnable onComplete + ) { + try { + transportService.sendRequest(targetNode, SHARD_ACTION_NAME, request, new TransportResponseHandler() { + @Override + public void handleResponse(BenchmarkDataResponse response) { + ctx.totalRows.addAndGet(request.getRows()); + ctx.totalBytes.addAndGet(response.getPayloadSize()); + ctx.latencies.add(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - requestStart)); + ctx.latch.countDown(); + onComplete.run(); + } + + @Override + public void handleException(TransportException exp) { + logger.warn("Transport request failed: {}", exp.getMessage()); + ctx.latch.countDown(); + onComplete.run(); + } + + @Override + public String executor() { + return StreamTransportExamplePlugin.BENCHMARK_RESPONSE_POOL_NAME; + } + + @Override + public BenchmarkDataResponse read(StreamInput in) throws IOException { + return new BenchmarkDataResponse(in); + } + }); + } catch (Exception e) { + logger.warn("Failed to send regular request: {}", e.getMessage()); + ctx.latch.countDown(); + } + } + + private long calculateTotalBytes(BenchmarkStreamRequest request) { + return (long) request.getRows() * request.getColumns() * request.getAvgColumnLength(); + } + + private long calculateBytesPerRow(BenchmarkStreamRequest request) { + return (long) request.getColumns() * request.getAvgColumnLength(); + } + + private byte[] generateSyntheticData(long bytes) { + if (bytes <= 0) return new byte[0]; + if (bytes > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + "Payload size " + + bytes + + " bytes exceeds max array size for regular transport. " + + "Use stream transport (use_stream_transport=true) for payloads larger than 2GB." + ); + } + byte[] data = new byte[(int) bytes]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) (i & 0xFF); + } + return data; + } + + private BenchmarkStreamResponse calculateStats( + long totalRows, + long totalBytes, + long durationMs, + List latencies, + int parallelRequests, + boolean usedStreamTransport + ) { + if (latencies.isEmpty()) { + return new BenchmarkStreamResponse( + 0, + 0, + durationMs, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + parallelRequests, + usedStreamTransport, + null, + null, + null + ); + } + + Collections.sort(latencies); + + double durationSec = durationMs / MS_TO_SEC; + double throughputRowsPerSec = totalRows / durationSec; + double throughputMbPerSec = (totalBytes / BYTES_TO_MB) / durationSec; + + long min = latencies.get(0); + long max = latencies.get(latencies.size() - 1); + long avg = (long) latencies.stream().mapToLong(Long::longValue).average().orElse(0); + long p5 = latencies.get(Math.min((int) (latencies.size() * 0.05), latencies.size() - 1)); + long p10 = latencies.get(Math.min((int) (latencies.size() * 0.10), latencies.size() - 1)); + long p20 = latencies.get(Math.min((int) (latencies.size() * 0.20), latencies.size() - 1)); + long p25 = latencies.get(Math.min((int) (latencies.size() * 0.25), latencies.size() - 1)); + long p35 = latencies.get(Math.min((int) (latencies.size() * 0.35), latencies.size() - 1)); + long p50 = latencies.get(Math.min((int) (latencies.size() * 0.50), latencies.size() - 1)); + long p75 = latencies.get(Math.min((int) (latencies.size() * 0.75), latencies.size() - 1)); + long p90 = latencies.get(Math.min((int) (latencies.size() * 0.90), latencies.size() - 1)); + long p99 = latencies.get(Math.min((int) (latencies.size() * 0.99), latencies.size() - 1)); + + return new BenchmarkStreamResponse( + totalRows, + totalBytes, + durationMs, + throughputRowsPerSec, + throughputMbPerSec, + min, + max, + avg, + p5, + p10, + p20, + p25, + p35, + p50, + p75, + p90, + p99, + parallelRequests, + usedStreamTransport, + null, + null, + null + ); + } + + private static class BenchmarkContext { + final int totalRequests; + final int maxInFlight; + final DiscoveryNode[] dataNodes; + final boolean useStream; + final CountDownLatch latch; + final List latencies; + final AtomicLong totalRows = new AtomicLong(); + final AtomicLong totalBytes = new AtomicLong(); + final AtomicLong activeRequests = new AtomicLong(); + final AtomicLong sentRequests = new AtomicLong(); + final AtomicLong refilling = new AtomicLong(); + + BenchmarkContext(int totalRequests, int maxInFlight, DiscoveryNode[] dataNodes, boolean useStream) { + this.totalRequests = totalRequests; + this.maxInFlight = maxInFlight; + this.dataNodes = dataNodes; + this.useStream = useStream; + this.latch = new CountDownLatch(totalRequests); + this.latencies = Collections.synchronizedList(new ArrayList<>(totalRequests)); + } + } + + private static class ThreadPoolSnapshot { + final Map pools; + + ThreadPoolSnapshot(Map pools) { + this.pools = pools; + } + + static class PoolStats { + final long queue; + final long completed; + final int active; + final long totalWaitTimeNanos; + + PoolStats(long queue, long completed, int active, long totalWaitTimeNanos) { + this.queue = queue; + this.completed = completed; + this.active = active; + this.totalWaitTimeNanos = totalWaitTimeNanos; + } + } + } + + private ThreadPoolSnapshot captureThreadPoolStats(String[] poolNames) { + Map pools = new HashMap<>(); + Set availablePools = new HashSet<>(); + for (Stats stats : threadPool.stats()) { + availablePools.add(stats.getName()); + for (String poolName : poolNames) { + if (stats.getName().equals(poolName)) { + pools.put( + poolName, + new ThreadPoolSnapshot.PoolStats( + stats.getQueue(), + stats.getCompleted(), + stats.getActive(), + stats.getWaitTimeNanos() + ) + ); + break; + } + } + } + + return new ThreadPoolSnapshot(pools); + } + + private BenchmarkStreamResponse.ThreadPoolStats calculateThreadPoolDiff(ThreadPoolSnapshot before, ThreadPoolSnapshot after) { + List poolStats = new ArrayList<>(); + for (String poolName : after.pools.keySet()) { + ThreadPoolSnapshot.PoolStats beforeStats = before.pools.get(poolName); + ThreadPoolSnapshot.PoolStats afterStats = after.pools.get(poolName); + if (beforeStats != null && afterStats != null) { + long waitTimeDiffNanos = afterStats.totalWaitTimeNanos - beforeStats.totalWaitTimeNanos; + poolStats.add( + new BenchmarkStreamResponse.PoolStat( + poolName, + afterStats.queue - beforeStats.queue, + afterStats.completed - beforeStats.completed, + afterStats.active, + TimeUnit.NANOSECONDS.toMillis(waitTimeDiffNanos), + afterStats.active, + (int) afterStats.queue, + poolName.contains("flight") ? new int[] { 0 } : null // Placeholder for event loop pending + ) + ); + } + } + return new BenchmarkStreamResponse.ThreadPoolStats(poolStats); + } + + private String captureThreadPoolState(String[] poolNames) { + StringBuilder state = new StringBuilder(); + for (Stats stats : threadPool.stats()) { + for (String poolName : poolNames) { + if (stats.getName().equals(poolName)) { + if (state.length() > 0) state.append(", "); + state.append(poolName).append("[a=").append(stats.getActive()).append(",q=").append(stats.getQueue()).append("]"); + break; + } + } + } + return state.toString(); + } +} diff --git a/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/package-info.java b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/package-info.java new file mode 100644 index 0000000000000..5e79b0a2bae53 --- /dev/null +++ b/plugins/examples/stream-transport-example/src/main/java/org/opensearch/example/stream/benchmark/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Example classes for benchmark and compare the stream with regular transport + */ +package org.opensearch.example.stream.benchmark;