Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix SearchPhaseExecutionException to properly initCause ([#20320](https://github.com/opensearch-project/OpenSearch/pull/20320))
- Fix `cluster.remote.<cluster_alias>.server_name` setting no populating SNI ([#20321](https://github.com/opensearch-project/OpenSearch/pull/20321))
- Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport ([#20371](https://github.com/opensearch-project/OpenSearch/pull/20371))
- Fix Thread context is not well restored for streaming ([#20361](https://github.com/opensearch-project/OpenSearch/pull/20361))

### Dependencies
- Bump `com.google.auth:google-auth-library-oauth2-http` from 1.38.0 to 1.41.0 ([#20183](https://github.com/opensearch-project/OpenSearch/pull/20183))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.http.reactor.netty4;

import io.netty.util.NettyRuntime;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchReactorNetty4IntegTestCase;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.http.HttpChunk;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.StreamingRestChannel;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.transport.Netty4ModulePlugin;
import org.opensearch.transport.client.node.NodeClient;
import org.opensearch.transport.reactor.ReactorNetty4Plugin;
import org.junit.Assert;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCounted;
import org.opensearch.transport.reactor.netty4.Netty4Utils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.opensearch.rest.RestRequest.Method.POST;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class ReactorNetty4ThreadContextRestorationIT extends OpenSearchReactorNetty4IntegTestCase {

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting<?> builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
// disable Telemetry setting
featureSettings.put(FeatureFlags.TELEMETRY_SETTING.getKey(), false);
return featureSettings.build();
}
Comment on lines +73 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find the test class and its parent
rg -n "class ReactorNetty4ThreadContextRestorationIT" --type java -A 3

# Search for featureFlagSettings method definitions in the codebase
rg -n "protected.*Settings.*featureFlagSettings" --type java -B 2 -A 5

# Check for invocations of featureFlagSettings
rg -n "featureFlagSettings()" --type java

Repository: opensearch-project/OpenSearch

Length of output: 19384


🏁 Script executed:

#!/bin/bash
# Find OpenSearchReactorNetty4IntegTestCase definition
rg -n "class OpenSearchReactorNetty4IntegTestCase" --type java -A 5

# Check its parent
rg -n "OpenSearchReactorNetty4IntegTestCase" --type java -B 2 | head -30

Repository: opensearch-project/OpenSearch

Length of output: 5410


Add @Override annotation to featureFlagSettings() method.

The method overrides the base implementation from OpenSearchIntegTestCase and is properly invoked by the test framework. However, it is missing the @Override annotation. Every other featureFlagSettings() override in the codebase includes this annotation for consistency and to enable compile-time detection of signature mismatches.

🤖 Prompt for AI Agents
In
@plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java
around lines 73 - 81, The method featureFlagSettings() in
ReactorNetty4ThreadContextRestorationIT is intended to override the base
implementation but lacks the @Override annotation; add the @Override annotation
above the featureFlagSettings() method declaration to match other overrides and
enable compile-time signature checks.


@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(ReactorNetty4Plugin.class, Netty4ModulePlugin.class, MockStreamingPlugin.class);
}

public static class MockStreamingPlugin extends Plugin implements ActionPlugin {
@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return List.of(new MockStreamingRestHandler());
}
}

public static class MockStreamingRestHandler extends BaseRestHandler {
@Override
public String getName() {
return "mock_ml_streaming_handler";
}

private final Logger logger = LogManager.getLogger(MockStreamingRestHandler.class);

@Override
public List<Route> routes() {
return List.of(new Route(POST, "/_plugins/_ml/models/_stream"));
}

@Override
public boolean supportsStreaming() {
return true;
}

@Override
public boolean supportsContentStream() {
return true;
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
return channel -> {
if (channel instanceof StreamingRestChannel) {
StreamingRestChannel streamingChannel = (StreamingRestChannel) channel;

// Mimic ML streaming behavior - store thread context
final ThreadContext.StoredContext storedContext = client.threadPool().getThreadContext().newStoredContext(true);
Copy link
Contributor

@reta reta Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hailong-am OK, preliminary, I think the issue here is coming from improper ThreadContext manipulation within streaming handler implementation (in this case, ml-common): the client.threadPool().getThreadContext().newStoredContext(true) , context.restore(); and storedContext.close() are very likely executed in different threads, hence not restored properly. I need a bit more time to confirm that and suggest a fix if that is the issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the proper way to manage the context in this case would be;

final Supplier<ThreadContext.StoredContext> restorable = client
    .threadPool()
    .getThreadContext()
    .newRestorableContext(true);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding. It turns out the way we used to managing thread context is not the right way and make the thread context not well restored.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta with the correct way to manage context i don't have duplicate header issue anymore. Do you think this PR is still valid and worked as a fallback to clean up thread context?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hailong-am we definitely should not do any fallbacks to clean the context - it should be fixed, not workarounded. But tracing related finding (https://github.com/opensearch-project/OpenSearch/pull/20361/changes#r2672199308) would certainly worth addressing. Thank you.


// Set streaming headers like ML action
Map<String, List<String>> headers = Map.of(
"Content-Type",
List.of("text/event-stream"),
"Cache-Control",
List.of("no-cache"),
"Connection",
List.of("keep-alive")
);
streamingChannel.prepareResponse(RestStatus.OK, headers);

// Use Flux pattern like ML streaming action
Flux.from(streamingChannel).ofType(HttpChunk.class).collectList().flatMap(chunks -> {
try (ThreadContext.StoredContext context = storedContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And than just use it like this:

try (ThreadContext.StoredContext ctx = restorable.get()) {
   .... // no need to call `context.restore();` 
}

context.restore();

String opaqueId = request.header(Task.X_OPAQUE_ID);
streamingChannel.sendChunk(
createHttpChunk("data: {\"status\":\"streaming\",\"opaque_id\":\"" + opaqueId + "\"}\n\n", false)
);

final CompletableFuture<HttpChunk> future = new CompletableFuture<>();

Flux.just(
createHttpChunk("data: {\"content\":\"test chunk 1\"}\n\n", false),
createHttpChunk("data: {\"content\":\"test chunk 2\"}\n\n", false),
createHttpChunk("data: {\"content\":\"final chunk\",\"is_last\":true}\n\n", true)
)
.delayElements(Duration.ofMillis(100))
.doOnNext(streamingChannel::sendChunk)
.doOnComplete(() -> future.complete(createHttpChunk("", true)))
.doOnError(future::completeExceptionally)
.subscribe(); // Simulate streaming delay

return Mono.fromCompletionStage(future);
} catch (Exception e) {
return Mono.error(e);
}
})
.doOnNext(streamingChannel::sendChunk)
.doFinally(signalType -> storedContext.close()) // Restore context when done
Comment on lines +147 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Double-close of StoredContext detected.

The storedContext is closed twice:

  1. Line 148: try (ThreadContext.StoredContext context = storedContext) auto-closes at block exit
  2. Line 175: doFinally(signalType -> storedContext.close()) closes again

This can lead to unexpected behavior. Consider removing one of the close mechanisms.

🐛 Proposed fix

Remove the try-with-resources and rely solely on doFinally for cleanup:

                     Flux.from(streamingChannel).ofType(HttpChunk.class).collectList().flatMap(chunks -> {
-                        try (ThreadContext.StoredContext context = storedContext) {
-                            context.restore();
+                        try {
+                            storedContext.restore();
 
                             String opaqueId = request.header(Task.X_OPAQUE_ID);
                             streamingChannel.sendChunk(
@@ -168,7 +168,7 @@ public class ReactorNetty4ThreadContextRestorationIT extends OpenSearchReactorNe
                         }
                     })
                         .doOnNext(streamingChannel::sendChunk)
-                        .doFinally(signalType -> storedContext.close()) // Restore context when done
+                        .doFinally(signalType -> storedContext.close())
                         .onErrorResume(ex -> {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
Flux.from(streamingChannel).ofType(HttpChunk.class).collectList().flatMap(chunks -> {
try (ThreadContext.StoredContext context = storedContext) {
context.restore();
String opaqueId = request.header(Task.X_OPAQUE_ID);
streamingChannel.sendChunk(
createHttpChunk("data: {\"status\":\"streaming\",\"opaque_id\":\"" + opaqueId + "\"}\n\n", false)
);
final CompletableFuture<HttpChunk> future = new CompletableFuture<>();
Flux.just(
createHttpChunk("data: {\"content\":\"test chunk 1\"}\n\n", false),
createHttpChunk("data: {\"content\":\"test chunk 2\"}\n\n", false),
createHttpChunk("data: {\"content\":\"final chunk\",\"is_last\":true}\n\n", true)
)
.delayElements(Duration.ofMillis(100))
.doOnNext(streamingChannel::sendChunk)
.doOnComplete(() -> future.complete(createHttpChunk("", true)))
.doOnError(future::completeExceptionally)
.subscribe(); // Simulate streaming delay
return Mono.fromCompletionStage(future);
} catch (Exception e) {
return Mono.error(e);
}
})
.doOnNext(streamingChannel::sendChunk)
.doFinally(signalType -> storedContext.close()) // Restore context when done
Flux.from(streamingChannel).ofType(HttpChunk.class).collectList().flatMap(chunks -> {
try {
storedContext.restore();
String opaqueId = request.header(Task.X_OPAQUE_ID);
streamingChannel.sendChunk(
createHttpChunk("data: {\"status\":\"streaming\",\"opaque_id\":\"" + opaqueId + "\"}\n\n", false)
);
final CompletableFuture<HttpChunk> future = new CompletableFuture<>();
Flux.just(
createHttpChunk("data: {\"content\":\"test chunk 1\"}\n\n", false),
createHttpChunk("data: {\"content\":\"test chunk 2\"}\n\n", false),
createHttpChunk("data: {\"content\":\"final chunk\",\"is_last\":true}\n\n", true)
)
.delayElements(Duration.ofMillis(100))
.doOnNext(streamingChannel::sendChunk)
.doOnComplete(() -> future.complete(createHttpChunk("", true)))
.doOnError(future::completeExceptionally)
.subscribe(); // Simulate streaming delay
return Mono.fromCompletionStage(future);
} catch (Exception e) {
return Mono.error(e);
}
})
.doOnNext(streamingChannel::sendChunk)
.doFinally(signalType -> storedContext.close())
🤖 Prompt for AI Agents
In
@plugins/transport-reactor-netty4/src/internalClusterTest/java/org/opensearch/http/reactor/netty4/ReactorNetty4ThreadContextRestorationIT.java
around lines 147 - 175, The code currently double-closes storedContext by using
try-with-resources (try (ThreadContext.StoredContext context = storedContext) {
... }) which auto-closes, and also calling storedContext.close() in doFinally;
remove the try-with-resources construct: replace the try(...) block with a plain
block that calls storedContext.restore() (use storedContext.restore() instead of
context.restore()) and remove the corresponding automatic close, leaving the
doFinally(signalType -> storedContext.close()) as the sole cleanup.

.onErrorResume(ex -> {
try {
HttpChunk errorChunk = createHttpChunk("data: {\"error\":\"" + ex.getMessage() + "\"}\n\n", true);
streamingChannel.sendChunk(errorChunk);
} catch (Exception e) {
// Log error
}
return Mono.empty();
})
.subscribe();
}
};
}

private HttpChunk createHttpChunk(String sseData, boolean isLast) {
BytesReference bytesRef = BytesReference.fromByteBuffer(ByteBuffer.wrap(sseData.getBytes()));
return new HttpChunk() {
@Override
public void close() {
if (bytesRef instanceof Releasable) {
((Releasable) bytesRef).close();
}
}

@Override
public boolean isLast() {
return isLast;
}

@Override
public BytesReference content() {
return bytesRef;
}
};
}
}

public void testThreadContextRestorationWithDuplicateOpaqueId() throws Exception {
ensureGreen();

HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = randomFrom(boundAddresses);

List<Tuple<String, CharSequence>> requests = new ArrayList<>();
int cores = NettyRuntime.availableProcessors();
for (int i = 0; i < cores + 1; i++) {
requests.add(Tuple.tuple("/_plugins/_ml/models/_stream", "dummy request body"));
}
int i = 0;
while (i++ < requests.size()) {
try (ReactorHttpClient nettyHttpClient = ReactorHttpClient.create(false)) {
Collection<FullHttpResponse> singleResponse = nettyHttpClient.post(transportAddress.address(), requests.subList(0, 1));
try {
Assert.assertEquals(1, singleResponse.size());
FullHttpResponse response = singleResponse.iterator().next();
String responseBody = response.content().toString(CharsetUtil.UTF_8);
Assert.assertEquals(200, response.status().code());
Assert.assertFalse(responseBody.contains("value for key [X-Opaque-Id] already present"));
} finally {
singleResponse.forEach(ReferenceCounted::release);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.common.util.net.NetUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -381,6 +382,8 @@ protected Publisher<Void> incomingRequest(HttpServerRequest request, HttpServerR
request.params()
);
if (dispatchHandlerOpt.map(RestHandler::supportsStreaming).orElse(false)) {
final ThreadContext.StoredContext storedContext = threadPool.getThreadContext().newStoredContext(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's find out the cause first before doing that, the polluted thread context may pop up in other places and we should understand first why it happens.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the test cass could reproduce it and you could help to check whether other place could potentially have same issue


final ReactorNetty4StreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4StreamingRequestConsumer<>(
this,
request,
Expand All @@ -392,7 +395,9 @@ protected Publisher<Void> incomingRequest(HttpServerRequest request, HttpServerR
.subscribe(consumer, error -> {}, () -> consumer.accept(DefaultLastHttpContent.EMPTY_LAST_CONTENT));

incomingStream(new ReactorNetty4HttpRequest(request), consumer.httpChannel());
return response.sendObject(consumer);

// restore ThreadContext when streaming is finished
return response.sendObject(consumer).then(Mono.fromRunnable(storedContext::close));
} else {
final ReactorNetty4NonStreamingRequestConsumer<HttpContent> consumer = new ReactorNetty4NonStreamingRequestConsumer<>(
this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.http.HttpChunk;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.telemetry.tracing.channels.TraceableRestChannel;
import org.opensearch.transport.client.node.NodeClient;
import org.opensearch.usage.UsageService;

Expand Down Expand Up @@ -345,6 +346,10 @@ private void dispatchRequest(RestRequest request, RestChannel channel, RestHandl
}

if (handler.supportsStreaming()) {
// for the case TraceableRestChannel wrapped steamingRestChannel, we need to unwrap it
if (channel instanceof TraceableRestChannel traceableRestChannel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch with TraceableRestChannel, to address this issue, we would need:

  • introduce TraceableStreamingRestChannel that implements StreamingRestChannel
  • change TraceableRestChannel to return either TraceableRestChannel or TraceableStreamingRestChannel

channel = traceableRestChannel.unwrap();
}
// The handler may support streaming but not the engine, in this case we fail with the bad request
if (channel instanceof StreamingRestChannel streamingRestChannel) {
responseChannel = new StreamHandlingHttpChannel(streamingRestChannel, circuitBreakerService, contentLength);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ public static RestChannel create(RestChannel delegate, Span span, Tracer tracer)
}
}

/**
* Returns delegate.
* @return delegate
*/
public RestChannel unwrap() {
return delegate;
}

@Override
public XContentBuilder newBuilder() throws IOException {
return delegate.newBuilder();
Expand Down
Loading