Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions plugins/examples/stream-transport-example/BENCHMARK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# Benchmark API

The Benchmark API allows you to compare the performance of Stream Transport (Arrow Flight) vs Regular Transport (Netty4) for node-to-node communication in OpenSearch.

## Package

`org.opensearch.example.stream.benchmark`

## Endpoint

```
POST /_benchmark/stream
```

## Parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `rows` | int | 100 | Number of rows per request |
| `columns` | int | 10 | Number of columns per row |
| `avg_column_length` | int | 100 | Average column length in bytes |
| `parallel_requests` | int | 1 | Number of parallel requests to execute |
| `total_requests` | int | 0 | Total requests to send (0 = use parallel_requests only) |
| `batch_size` | int | 100 | Rows per batch for stream transport |
| `use_stream_transport` | boolean | true | Use stream transport (true) or regular transport (false) |

## Examples

### Stream Transport (Arrow Flight)

```bash
curl -X POST "localhost:9200/_benchmark/stream?rows=100000&parallel_requests=10&total_requests=600&batch_size=10000&use_stream_transport=true&pretty&human=true"
```

**Response:**
```json
{
"total_rows" : 60000000,
"total_bytes" : 57970000000,
"total_size" : "53.9gb",
"total_size_bytes" : 57970000000,
"duration_ms" : 47730,
"throughput_rows_per_sec" : "1257071.02",
"throughput_mb_per_sec" : "1158.28",
"latency_ms" : {
"min" : 179,
"max" : 1862,
"avg" : 1578,
"p5" : 1134,
"p10" : 1425,
"p20" : 1481,
"p25" : 1495,
"p35" : 1532,
"p50" : 1622,
"p75" : 1717,
"p90" : 1791,
"p99" : 1838
},
"parallel_requests" : 10,
"used_stream_transport" : true
}
```

### Regular Transport (Netty4)

```bash
curl -X POST "localhost:9200/_benchmark/stream?rows=100000&parallel_requests=10&total_requests=600&batch_size=10000&use_stream_transport=false&pretty&human=true"
```

**Response:**
```json
{
"total_rows" : 60000000,
"total_bytes" : 57970000000,
"total_size" : "53.9gb",
"total_size_bytes" : 57970000000,
"duration_ms" : 52100,
"throughput_rows_per_sec" : "1151632.57",
"throughput_mb_per_sec" : "1061.45",
"latency_ms" : {
"min" : 195,
"max" : 2012,
"avg" : 1723,
"p5" : 1245,
"p10" : 1567,
"p20" : 1623,
"p25" : 1645,
"p35" : 1689,
"p50" : 1756,
"p75" : 1834,
"p90" : 1923,
"p99" : 1987
},
"parallel_requests" : 10,
"used_stream_transport" : false
}
```

## Response Fields

| Field | Description |
|-------|-------------|
| `total_rows` | Total number of rows processed |
| `total_bytes` | Total bytes transferred |
| `total_size` | Human-readable size (with `human=true`) |
| `duration_ms` | Total duration in milliseconds |
| `throughput_rows_per_sec` | Rows processed per second |
| `throughput_mb_per_sec` | Megabytes transferred per second |
| `latency_ms.min` | Minimum request latency |
| `latency_ms.max` | Maximum request latency |
| `latency_ms.avg` | Average request latency |
| `latency_ms.p5` - `latency_ms.p99` | Latency percentiles |
| `parallel_requests` | Number of parallel requests used |
| `used_stream_transport` | Whether stream transport was used |

## Use Cases

### 1. Compare Transport Performance

```bash
# Stream transport
curl -X POST "localhost:9200/_benchmark/stream?rows=10000&parallel_requests=10&use_stream_transport=true&pretty"

# Regular transport
curl -X POST "localhost:9200/_benchmark/stream?rows=10000&parallel_requests=10&use_stream_transport=false&pretty"
```

### 2. Test High Throughput

```bash
curl -X POST "localhost:9200/_benchmark/stream?rows=100000&parallel_requests=50&total_requests=1000&batch_size=10000&use_stream_transport=true&pretty&human=true"
```

### 3. Test Large Payloads

```bash
curl -X POST "localhost:9200/_benchmark/stream?rows=50000&columns=50&avg_column_length=1000&parallel_requests=5&use_stream_transport=true&pretty&human=true"
```

### 4. Latency Analysis

```bash
curl -X POST "localhost:9200/_benchmark/stream?rows=1000&parallel_requests=100&total_requests=1000&use_stream_transport=true&pretty" | jq '.latency_ms'
```

## Performance Tips

1. **Parallel Requests**: The API maintains a cushion of 2x `parallel_requests` in-flight to maximize throughput
2. **Batch Size**: For stream transport, larger batch sizes (10000+) reduce overhead
3. **Total Requests**: Use `total_requests` to send more requests than `parallel_requests` for sustained load testing
4. **Network**: Run benchmarks on nodes with good network connectivity for accurate results

## Implementation Details

- **Batching**: Maintains 2x parallel_requests in-flight using lock-free refill mechanism
- **Thread Pools**: Uses dedicated `benchmark` and `benchmark_response` thread pools
- **Data Generation**: Generates synthetic data with non-compressible patterns
- **Latency Tracking**: Measures end-to-end latency including all batches for stream transport

## Integration Tests

See `BenchmarkStreamIT.java` for comprehensive test examples covering:
- Basic benchmarks
- Parallel requests
- Stream vs regular transport comparison
- Large payloads
- Custom batch sizes
- Latency percentile validation
19 changes: 18 additions & 1 deletion plugins/examples/stream-transport-example/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
# Stream Transport Example

Step-by-step guide to implement streaming transport actions in OpenSearch.
Example plugin demonstrating streaming transport actions and transport performance benchmarking.

## Benchmark API

Compare Flight (stream) vs Netty4 (regular) transport performance. See [BENCHMARK.md](BENCHMARK.md).

```bash
# Run tests
./gradlew :example-plugins:stream-transport-example:internalClusterTest --tests "org.opensearch.example.stream.BenchmarkStreamIT"

# Compare transports
curl -X POST "localhost:9200/_benchmark/stream?rows=1000&parallel_requests=10&use_stream_transport=true"
curl -X POST "localhost:9200/_benchmark/stream?rows=1000&parallel_requests=10&use_stream_transport=false"
```

---

## Streaming Transport Guide

## Step 1: Create Action Definition

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void handleException(TransportException exp) {

@Override
public String executor() {
return ThreadPool.Names.SAME;
return ThreadPool.Names.GENERIC;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.example.stream.benchmark;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.arrow.flight.transport.FlightStreamPlugin;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.example.stream.StreamTransportExamplePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchTestCase.LockFeatureFlag;

import java.util.Collection;
import java.util.List;

import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2)
public class BenchmarkStreamIT extends OpenSearchIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(StreamTransportExamplePlugin.class, FlightStreamPlugin.class);
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testBasicBenchmark() {
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForGreenStatus().get();
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));

BenchmarkStreamRequest request = new BenchmarkStreamRequest();
request.setRows(10);
request.setColumns(5);
request.setAvgColumnLength(100);
request.setParallelRequests(1);
request.setUseStreamTransport(true);

BenchmarkStreamResponse response = client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet();

assertThat(response.getTotalRows(), equalTo(10L));
assertThat(response.getTotalBytes(), greaterThan(0L));
assertThat(response.getDurationMs(), greaterThan(0L));
assertThat(response.getThroughputRowsPerSec(), greaterThan(0.0));
assertThat(response.getMinLatencyMs(), greaterThan(0L));
assertThat(response.getMaxLatencyMs(), greaterThanOrEqualTo(response.getMinLatencyMs()));
assertThat(response.getParallelRequests(), equalTo(1));
assertTrue(response.isUsedStreamTransport());
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testParallelRequests() {
BenchmarkStreamRequest request = new BenchmarkStreamRequest();
request.setRows(50);
request.setColumns(10);
request.setParallelRequests(5);
request.setUseStreamTransport(true);

BenchmarkStreamResponse response = client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet();

assertThat(response.getTotalRows(), equalTo(250L)); // 50 rows × 5 parallel
assertThat(response.getParallelRequests(), equalTo(5));
assertThat(response.getP90LatencyMs(), greaterThan(0L));
assertThat(response.getP99LatencyMs(), greaterThanOrEqualTo(response.getP90LatencyMs()));
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testStreamVsRegularTransport() {
// Test with stream transport
BenchmarkStreamRequest streamRequest = new BenchmarkStreamRequest();
streamRequest.setRows(100);
streamRequest.setColumns(10);
streamRequest.setParallelRequests(2);
streamRequest.setUseStreamTransport(true);

BenchmarkStreamResponse streamResponse = client().execute(BenchmarkStreamAction.INSTANCE, streamRequest).actionGet();
assertTrue(streamResponse.isUsedStreamTransport());
assertThat(streamResponse.getTotalRows(), equalTo(200L));

// Test with regular transport
BenchmarkStreamRequest regularRequest = new BenchmarkStreamRequest();
regularRequest.setRows(100);
regularRequest.setColumns(10);
regularRequest.setParallelRequests(2);
regularRequest.setUseStreamTransport(false);

BenchmarkStreamResponse regularResponse = client().execute(BenchmarkStreamAction.INSTANCE, regularRequest).actionGet();
assertFalse(regularResponse.isUsedStreamTransport());
assertThat(regularResponse.getTotalRows(), equalTo(200L));

// Both should produce results
assertThat(streamResponse.getThroughputRowsPerSec(), greaterThan(0.0));
assertThat(regularResponse.getThroughputRowsPerSec(), greaterThan(0.0));
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testLargePayload() {
BenchmarkStreamRequest request = new BenchmarkStreamRequest();
request.setRows(1000);
request.setColumns(20);
request.setAvgColumnLength(500);
request.setParallelRequests(3);

BenchmarkStreamResponse response = client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet();

assertThat(response.getTotalRows(), equalTo(3000L));
assertThat(response.getTotalBytes(), greaterThan(1_000_000L)); // > 1MB
assertThat(response.getThroughputMbPerSec(), greaterThan(0.0));
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testValidation() {
BenchmarkStreamRequest request = new BenchmarkStreamRequest();
request.setRows(-1);

try {
client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet();
fail("Expected validation exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains("rows must be > 0"));
}
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testThreadPoolConfiguration() {
BenchmarkStreamRequest request = new BenchmarkStreamRequest();
request.setRows(10);
request.setThreadPool("generic");
request.setParallelRequests(1);

BenchmarkStreamResponse response = client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet();
assertThat(response.getTotalRows(), equalTo(10L));
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testLatencyPercentiles() {
BenchmarkStreamRequest request = new BenchmarkStreamRequest();
request.setRows(50);
request.setParallelRequests(10);

BenchmarkStreamResponse response = client().execute(BenchmarkStreamAction.INSTANCE, request).actionGet();

// Verify percentile ordering
assertThat(response.getMinLatencyMs(), lessThanOrEqualTo(response.getAvgLatencyMs()));
assertThat(response.getAvgLatencyMs(), lessThanOrEqualTo(response.getP90LatencyMs()));
assertThat(response.getP90LatencyMs(), lessThanOrEqualTo(response.getP99LatencyMs()));
assertThat(response.getP99LatencyMs(), lessThanOrEqualTo(response.getMaxLatencyMs()));
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testCustomBatchSize() {
BenchmarkStreamRequest smallBatchRequest = new BenchmarkStreamRequest();
smallBatchRequest.setRows(100);
smallBatchRequest.setBatchSize(10);
smallBatchRequest.setUseStreamTransport(true);

BenchmarkStreamResponse smallBatchResponse = client().execute(BenchmarkStreamAction.INSTANCE, smallBatchRequest).actionGet();
assertThat(smallBatchResponse.getTotalRows(), equalTo(100L));
assertTrue(smallBatchResponse.isUsedStreamTransport());

BenchmarkStreamRequest largeBatchRequest = new BenchmarkStreamRequest();
largeBatchRequest.setRows(100);
largeBatchRequest.setBatchSize(500);
largeBatchRequest.setUseStreamTransport(true);

BenchmarkStreamResponse largeBatchResponse = client().execute(BenchmarkStreamAction.INSTANCE, largeBatchRequest).actionGet();
assertThat(largeBatchResponse.getTotalRows(), equalTo(100L));
assertTrue(largeBatchResponse.isUsedStreamTransport());
}
}
Loading
Loading