From 240a223da8b60c0cf30bdc3044cb2caa2afa2465 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 20:14:03 +0530 Subject: [PATCH 01/18] Update --- .../ensemble_backpressure_test.py | 233 ++++++++++++++++++ .../models/decoupled_producer/1/model.py | 54 ++++ .../models/decoupled_producer/config.pbtxt | 58 +++++ .../config.pbtxt | 78 ++++++ .../config.pbtxt | 83 +++++++ .../models/slow_consumer/1/model.py | 52 ++++ .../models/slow_consumer/config.pbtxt | 54 ++++ qa/L0_simple_ensemble/test.sh | 36 ++- 8 files changed, 647 insertions(+), 1 deletion(-) create mode 100644 qa/L0_simple_ensemble/ensemble_backpressure_test.py create mode 100644 qa/L0_simple_ensemble/models/decoupled_producer/1/model.py create mode 100644 qa/L0_simple_ensemble/models/decoupled_producer/config.pbtxt create mode 100644 qa/L0_simple_ensemble/models/ensemble_disabled_max_inflight_responses/config.pbtxt create mode 100644 qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt create mode 100644 qa/L0_simple_ensemble/models/slow_consumer/1/model.py create mode 100644 qa/L0_simple_ensemble/models/slow_consumer/config.pbtxt diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py new file mode 100644 index 0000000000..e2919e5da5 --- /dev/null +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 + +# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import sys + +sys.path.append("../common") + +import queue +import unittest +from functools import partial +import numpy as np +import test_util as tu +import tritonclient.grpc as grpcclient + + +SERVER_URL = "localhost:8001" +DEFAULT_RESPONSE_TIMEOUT = 60 + + +class UserData: + def __init__(self): + self._response_queue = queue.Queue() + + +def callback(user_data, result, error): + if error: + user_data._response_queue.put(error) + else: + user_data._response_queue.put(result) + + +class EnsembleBackpressureTest(tu.TestResultCollector): + """ + Tests for ensemble backpressure feature (max_ensemble_inflight_responses). + """ + + def _prepare_infer_args(self, input_value): + """ + Create InferInput/InferRequestedOutput lists + """ + input_data = np.array([input_value], dtype=np.int32) + infer_input = [grpcclient.InferInput("IN", input_data.shape, "INT32")] + infer_input[0].set_data_from_numpy(input_data) + outputs = [grpcclient.InferRequestedOutput("OUT")] + return infer_input, outputs + + def _collect_responses(self, user_data): + """ + Collect responses from user_data until the final response flag is seen. + """ + responses = [] + while True: + try: + result = user_data._response_queue.get(timeout=DEFAULT_RESPONSE_TIMEOUT) + except queue.Empty: + self.fail( + f"No response received within {DEFAULT_RESPONSE_TIMEOUT} seconds." + ) + + self.assertNotIsInstance( + result, Exception, f"Callback returned an exception: {result}" + ) + + # Add response to list if it has data (not empty final-only response) + response = result.get_response() + if len(response.outputs) > 0: + responses.append(result) + + # Check if this is the final response + final = response.parameters.get("triton_final_response") + if final and final.bool_param: + break + + return responses + + def test_backpressure_limits_inflight(self): + """ + Test that max_ensemble_inflight_responses correctly limits concurrent + responses and prevents unbounded memory growth. + """ + model_name = "ensemble_enabled_max_inflight_responses" + expected_count = 16 + user_data = UserData() + + with grpcclient.InferenceServerClient(SERVER_URL) as triton_client: + try: + inputs, outputs = self._prepare_infer_args(expected_count) + + triton_client.start_stream(callback=partial(callback, user_data)) + + triton_client.async_stream_infer( + model_name=model_name, inputs=inputs, outputs=outputs + ) + + # Collect responses + responses = self._collect_responses(user_data) + + # Verify we received the expected number of responses + self.assertEqual( + len(responses), + expected_count, + f"Expected {expected_count} responses, got {len(responses)}", + ) + + # Verify correctness of responses + for idx, resp in enumerate(responses): + output = resp.as_numpy("OUT") + self.assertEqual( + output[0], idx, f"Response {idx} has incorrect value" + ) + + finally: + triton_client.stop_stream() + + def test_backpressure_disabled(self): + """ + Test that ensemble model without max_ensemble_inflight_responses parameter + works fine (original behavior). + """ + model_name = "ensemble_disabled_max_inflight_responses" + expected_count = 16 + user_data = UserData() + + with grpcclient.InferenceServerClient(SERVER_URL) as triton_client: + try: + inputs, outputs = self._prepare_infer_args(expected_count) + + triton_client.start_stream(callback=partial(callback, user_data)) + + triton_client.async_stream_infer( + model_name=model_name, inputs=inputs, outputs=outputs + ) + + # Collect responses + responses = self._collect_responses(user_data) + + # Verify we received the expected number of responses + self.assertEqual( + len(responses), + expected_count, + f"Expected {expected_count} responses, got {len(responses)}", + ) + + # Verify correctness of responses + for idx, resp in enumerate(responses): + output = resp.as_numpy("OUT") + self.assertEqual( + output[0], idx, f"Response {idx} has incorrect value" + ) + + finally: + triton_client.stop_stream() + + def test_backpressure_concurrent_requests(self): + """ + Test that backpressure works correctly with multiple concurrent requests. + Each request should have independent backpressure state. + """ + model_name = "ensemble_enabled_max_inflight_responses" + num_concurrent = 8 + expected_per_request = 8 + + clients = [] + user_datas = [] + + try: + inputs, outputs = self._prepare_infer_args(expected_per_request) + + # Create separate client for each concurrent request + for i in range(num_concurrent): + client = grpcclient.InferenceServerClient(SERVER_URL) + user_data = UserData() + + client.start_stream(callback=partial(callback, user_data)) + client.async_stream_infer( + model_name=model_name, inputs=inputs, outputs=outputs + ) + + clients.append(client) + user_datas.append(user_data) + + # Collect and verify responses for all requests + for i, ud in enumerate(user_datas): + responses = self._collect_responses(ud) + self.assertEqual( + len(responses), + expected_per_request, + f"Request {i}: expected {expected_per_request} responses, got {len(responses)}", + ) + + # Verify correctness of responses + for idx, resp in enumerate(responses): + output = resp.as_numpy("OUT") + self.assertEqual( + output[0], idx, f"Response {idx} has incorrect value" + ) + + finally: + for client in clients: + try: + client.stop_stream() + client.close() + except: + pass + + +if __name__ == "__main__": + unittest.main() diff --git a/qa/L0_simple_ensemble/models/decoupled_producer/1/model.py b/qa/L0_simple_ensemble/models/decoupled_producer/1/model.py new file mode 100644 index 0000000000..14199c72dc --- /dev/null +++ b/qa/L0_simple_ensemble/models/decoupled_producer/1/model.py @@ -0,0 +1,54 @@ +# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import numpy as np +import triton_python_backend_utils as pb_utils + + +class TritonPythonModel: + """ + Decoupled model that produces N responses based on input value. + """ + + def execute(self, requests): + for request in requests: + # Get input - number of responses to produce + in_tensor = pb_utils.get_input_tensor_by_name(request, "IN") + count = in_tensor.as_numpy()[0] + + response_sender = request.get_response_sender() + + # Produce 'count' responses + for i in range(count): + out_tensor = pb_utils.Tensor("OUT", np.array([i], dtype=np.int32)) + response = pb_utils.InferenceResponse(output_tensors=[out_tensor]) + response_sender.send(response) + + # Send final flag + response_sender.send(flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL) + + return None diff --git a/qa/L0_simple_ensemble/models/decoupled_producer/config.pbtxt b/qa/L0_simple_ensemble/models/decoupled_producer/config.pbtxt new file mode 100644 index 0000000000..23f14f9be5 --- /dev/null +++ b/qa/L0_simple_ensemble/models/decoupled_producer/config.pbtxt @@ -0,0 +1,58 @@ +# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +name: "decoupled_producer" +backend: "python" +max_batch_size: 0 + +input [ + { + name: "IN" + data_type: TYPE_INT32 + dims: [ 1 ] + } +] + +output [ + { + name: "OUT" + data_type: TYPE_INT32 + dims: [ 1 ] + } +] + +instance_group [ + { + count: 1 + kind: KIND_CPU + } +] + +model_transaction_policy { + decoupled: true +} + diff --git a/qa/L0_simple_ensemble/models/ensemble_disabled_max_inflight_responses/config.pbtxt b/qa/L0_simple_ensemble/models/ensemble_disabled_max_inflight_responses/config.pbtxt new file mode 100644 index 0000000000..eeae9c4ea3 --- /dev/null +++ b/qa/L0_simple_ensemble/models/ensemble_disabled_max_inflight_responses/config.pbtxt @@ -0,0 +1,78 @@ +# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +name: "ensemble_disabled_max_inflight_responses" +platform: "ensemble" +max_batch_size: 0 + +# No backpressure parameter - tests backward compatibility + +input [ + { + name: "IN" + data_type: TYPE_INT32 + dims: [ 1 ] + } +] + +output [ + { + name: "OUT" + data_type: TYPE_INT32 + dims: [ 1 ] + } +] + +ensemble_scheduling { + step [ + { + model_name: "decoupled_producer" + model_version: -1 + input_map { + key: "IN" + value: "IN" + } + output_map { + key: "OUT" + value: "intermediate" + } + }, + { + model_name: "slow_consumer" + model_version: -1 + input_map { + key: "IN" + value: "intermediate" + } + output_map { + key: "OUT" + value: "OUT" + } + } + ] +} + diff --git a/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt b/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt new file mode 100644 index 0000000000..908a420700 --- /dev/null +++ b/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt @@ -0,0 +1,83 @@ +# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +name: "ensemble_enabled_max_inflight_responses" +platform: "ensemble" +max_batch_size: 0 + +# Backpressure configuration - limits inflight responses from +# decoupled step to avoid unbounded memory growth +parameters: { + key: "max_ensemble_inflight_responses" + value: { string_value: "4" } +} + +input [ + { + name: "IN" + data_type: TYPE_INT32 + dims: [ 1 ] + } +] + +output [ + { + name: "OUT" + data_type: TYPE_INT32 + dims: [ 1 ] + } +] + +ensemble_scheduling { + step [ + { + model_name: "decoupled_producer" + model_version: -1 + input_map { + key: "IN" + value: "IN" + } + output_map { + key: "OUT" + value: "intermediate" + } + }, + { + model_name: "slow_consumer" + model_version: -1 + input_map { + key: "IN" + value: "intermediate" + } + output_map { + key: "OUT" + value: "OUT" + } + } + ] +} + diff --git a/qa/L0_simple_ensemble/models/slow_consumer/1/model.py b/qa/L0_simple_ensemble/models/slow_consumer/1/model.py new file mode 100644 index 0000000000..c7c3c66ed6 --- /dev/null +++ b/qa/L0_simple_ensemble/models/slow_consumer/1/model.py @@ -0,0 +1,52 @@ +# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +import time +import numpy as np +import triton_python_backend_utils as pb_utils + + +class TritonPythonModel: + """ + Slow consumer model - adds delay to simulate slow processing. + """ + + def execute(self, requests): + responses = [] + + for request in requests: + # Add delay (200ms per request) + time.sleep(0.200) + + # Pass through the input + in_tensor = pb_utils.get_input_tensor_by_name(request, "IN") + out_tensor = pb_utils.Tensor("OUT", in_tensor.as_numpy()) + + response = pb_utils.InferenceResponse(output_tensors=[out_tensor]) + responses.append(response) + + return responses diff --git a/qa/L0_simple_ensemble/models/slow_consumer/config.pbtxt b/qa/L0_simple_ensemble/models/slow_consumer/config.pbtxt new file mode 100644 index 0000000000..64b1f1d315 --- /dev/null +++ b/qa/L0_simple_ensemble/models/slow_consumer/config.pbtxt @@ -0,0 +1,54 @@ +# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + +name: "slow_consumer" +backend: "python" +max_batch_size: 0 + +input [ + { + name: "IN" + data_type: TYPE_INT32 + dims: [ 1 ] + } +] + +output [ + { + name: "OUT" + data_type: TYPE_INT32 + dims: [ 1 ] + } +] + +instance_group [ + { + count: 1 + kind: KIND_CPU + } +] + diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index 927f36ea32..e399b1f264 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright 2019-2024, NVIDIA CORPORATION. All rights reserved. +# Copyright 2019-2025, NVIDIA CORPORATION. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -40,6 +40,8 @@ source ../common/util.sh # ensure ensemble models have version sub-directory mkdir -p `pwd`/models/ensemble_add_sub_int32_int32_int32/1 mkdir -p `pwd`/models/ensemble_partial_add_sub/1 +mkdir -p `pwd`/models/ensemble_enabled_max_inflight_responses/1 +mkdir -p `pwd`/models/ensemble_disabled_max_inflight_responses/1 rm -f $CLIENT_LOG $SERVER_LOG @@ -146,6 +148,38 @@ set -e kill $SERVER_PID wait $SERVER_PID +# Test ensemble backpressure feature (max_ensemble_inflight_responses parameter) +SERVER_LOG="./ensemble_backpressure_test_server.log" +CLIENT_LOG="./ensemble_backpressure_test_client.log" +BACKPRESSURE_TEST_PY=./ensemble_backpressure_test.py + +rm -f $SERVER_LOG $CLIENT_LOG + +SERVER_ARGS="--model-repository=`pwd`/models" +run_server +if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 +fi + +set +e +python $BACKPRESSURE_TEST_PY -v >> $CLIENT_LOG 2>&1 +if [ $? -ne 0 ]; then + RET=1 +else + check_test_results $TEST_RESULT_FILE 3 + if [ $? -ne 0 ]; then + cat $CLIENT_LOG + echo -e "\n***\n*** Test Result Verification Failed\n***" + RET=1 + fi +fi +set -e + +kill $SERVER_PID +wait $SERVER_PID + if [ $RET -eq 0 ]; then echo -e "\n***\n*** Test Passed\n***" else From 974aa252bdfc322295df2011faed386b300f9abc Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 21:11:50 +0530 Subject: [PATCH 02/18] Update --- .../ensemble_disabled_max_inflight_responses/config.pbtxt | 3 --- .../ensemble_enabled_max_inflight_responses/config.pbtxt | 1 - 2 files changed, 4 deletions(-) diff --git a/qa/L0_simple_ensemble/models/ensemble_disabled_max_inflight_responses/config.pbtxt b/qa/L0_simple_ensemble/models/ensemble_disabled_max_inflight_responses/config.pbtxt index eeae9c4ea3..c8566dda41 100644 --- a/qa/L0_simple_ensemble/models/ensemble_disabled_max_inflight_responses/config.pbtxt +++ b/qa/L0_simple_ensemble/models/ensemble_disabled_max_inflight_responses/config.pbtxt @@ -25,12 +25,9 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -name: "ensemble_disabled_max_inflight_responses" platform: "ensemble" max_batch_size: 0 -# No backpressure parameter - tests backward compatibility - input [ { name: "IN" diff --git a/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt b/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt index 908a420700..8ae33339bb 100644 --- a/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt +++ b/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt @@ -25,7 +25,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -name: "ensemble_enabled_max_inflight_responses" platform: "ensemble" max_batch_size: 0 From 337e0a72cd1d103116eb53acee799d895e0f5558 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 21:41:43 +0530 Subject: [PATCH 03/18] Update --- qa/L0_simple_ensemble/test.sh | 52 +++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index e399b1f264..8d75ebab8e 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -180,6 +180,58 @@ set -e kill $SERVER_PID wait $SERVER_PID + +# Test invalid values for max_ensemble_inflight_responses parameter +mkdir -p `pwd`/models/ensemble_invalid_negative_limit/1 +mkdir -p `pwd`/models/ensemble_invalid_string_limit/1 + +cp `pwd`/models/ensemble_disabled_max_inflight_responses/config.pbtxt `pwd`/models/ensemble_invalid_negative_limit/ +cat <> `pwd`/models/ensemble_invalid_negative_limit/config.pbtxt +parameters: { + key: "max_ensemble_inflight_responses" + value: { string_value: "-5" } +} +EOF + +cp `pwd`/models/ensemble_disabled_max_inflight_responses/config.pbtxt `pwd`/models/ensemble_invalid_string_limit/ +cat <> `pwd`/models/ensemble_invalid_string_limit/config.pbtxt +parameters: { + key: "max_ensemble_inflight_responses" + value: { string_value: "invalid_value" } +} +EOF + + +SERVER_LOG="./not_on_path_server.log" +rm -f $SERVER_LOG + +run_server +if [ "$SERVER_PID" != "0" ]; then + echo -e "\n***\n*** FAILED: unexpected success starting $SERVER\n***" + kill_server + RET=1 +fi + +set +e +# Verify valid config was parsed correctly +if ! grep -q "Ensemble model 'ensemble_enabled_max_inflight_responses' configured with max_ensemble_inflight_responses: 4" $SERVER_LOG; then + echo -e "\n***\n*** FAILED: Expected configuration message not found\n***" + RET=1 +fi + +# Verify negative value was rejected +if ! grep -q "Ignoring 'max_ensemble_inflight_responses' for ensemble model 'ensemble_invalid_negative_limit': value must be positive, got -5" $SERVER_LOG; then + echo -e "\n***\n*** FAILED: Expected error message not found\n***" + RET=1 +fi + +# Verify invalid string was rejected +if ! grep -q "Failed to parse 'max_ensemble_inflight_responses' for ensemble 'ensemble_invalid_string_limit': stoll" $SERVER_LOG; then + echo -e "\n***\n*** FAILED: Expected error message not found\n***" + RET=1 +fi +set -e + if [ $RET -eq 0 ]; then echo -e "\n***\n*** Test Passed\n***" else From 05dcb71322519b4a645cb2d1eb5ec1cbe4c7c9d7 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 23:09:59 +0530 Subject: [PATCH 04/18] Update --- qa/L0_simple_ensemble/ensemble_backpressure_test.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 qa/L0_simple_ensemble/ensemble_backpressure_test.py diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py old mode 100644 new mode 100755 From 4f379eddaa007e9d6970e94184ee3149ae8cb37c Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 23:18:39 +0530 Subject: [PATCH 05/18] Fix pre-commit --- .../ensemble_enabled_max_inflight_responses/config.pbtxt | 2 +- qa/L0_simple_ensemble/models/slow_consumer/1/model.py | 2 -- qa/L0_simple_ensemble/test.sh | 6 +++--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt b/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt index 8ae33339bb..51fa868f14 100644 --- a/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt +++ b/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt @@ -28,7 +28,7 @@ platform: "ensemble" max_batch_size: 0 -# Backpressure configuration - limits inflight responses from +# Backpressure configuration - limits inflight responses from # decoupled step to avoid unbounded memory growth parameters: { key: "max_ensemble_inflight_responses" diff --git a/qa/L0_simple_ensemble/models/slow_consumer/1/model.py b/qa/L0_simple_ensemble/models/slow_consumer/1/model.py index c7c3c66ed6..e82b2d8f61 100644 --- a/qa/L0_simple_ensemble/models/slow_consumer/1/model.py +++ b/qa/L0_simple_ensemble/models/slow_consumer/1/model.py @@ -24,9 +24,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - import time -import numpy as np import triton_python_backend_utils as pb_utils diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index 8d75ebab8e..7d4c33967f 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -186,7 +186,7 @@ mkdir -p `pwd`/models/ensemble_invalid_negative_limit/1 mkdir -p `pwd`/models/ensemble_invalid_string_limit/1 cp `pwd`/models/ensemble_disabled_max_inflight_responses/config.pbtxt `pwd`/models/ensemble_invalid_negative_limit/ -cat <> `pwd`/models/ensemble_invalid_negative_limit/config.pbtxt +cat <> `pwd`/models/ensemble_invalid_negative_limit/config.pbtxt parameters: { key: "max_ensemble_inflight_responses" value: { string_value: "-5" } @@ -194,7 +194,7 @@ parameters: { EOF cp `pwd`/models/ensemble_disabled_max_inflight_responses/config.pbtxt `pwd`/models/ensemble_invalid_string_limit/ -cat <> `pwd`/models/ensemble_invalid_string_limit/config.pbtxt +cat <> `pwd`/models/ensemble_invalid_string_limit/config.pbtxt parameters: { key: "max_ensemble_inflight_responses" value: { string_value: "invalid_value" } @@ -225,7 +225,7 @@ if ! grep -q "Ignoring 'max_ensemble_inflight_responses' for ensemble model 'ens RET=1 fi -# Verify invalid string was rejected +# Verify invalid string was rejected if ! grep -q "Failed to parse 'max_ensemble_inflight_responses' for ensemble 'ensemble_invalid_string_limit': stoll" $SERVER_LOG; then echo -e "\n***\n*** FAILED: Expected error message not found\n***" RET=1 From 9ed216f74227064ddcbc4ad46f2b7a445a3623f9 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 23:20:27 +0530 Subject: [PATCH 06/18] Fix pre-commit errors --- qa/L0_simple_ensemble/ensemble_backpressure_test.py | 1 + qa/L0_simple_ensemble/models/slow_consumer/1/model.py | 1 + 2 files changed, 2 insertions(+) diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index e2919e5da5..cc01e5c947 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -33,6 +33,7 @@ import queue import unittest from functools import partial + import numpy as np import test_util as tu import tritonclient.grpc as grpcclient diff --git a/qa/L0_simple_ensemble/models/slow_consumer/1/model.py b/qa/L0_simple_ensemble/models/slow_consumer/1/model.py index e82b2d8f61..5aac160b46 100644 --- a/qa/L0_simple_ensemble/models/slow_consumer/1/model.py +++ b/qa/L0_simple_ensemble/models/slow_consumer/1/model.py @@ -25,6 +25,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import time + import triton_python_backend_utils as pb_utils From 78698fc3725ba77436f941c454d541650459b9f0 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 23:24:49 +0530 Subject: [PATCH 07/18] Update --- qa/L0_simple_ensemble/ensemble_backpressure_test.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index cc01e5c947..16cd39b255 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -38,7 +38,6 @@ import test_util as tu import tritonclient.grpc as grpcclient - SERVER_URL = "localhost:8001" DEFAULT_RESPONSE_TIMEOUT = 60 @@ -226,8 +225,8 @@ def test_backpressure_concurrent_requests(self): try: client.stop_stream() client.close() - except: - pass + except Exception as e: + print(f"Exception during client cleanup: {e}") if __name__ == "__main__": From 8665a0df24b2b38e73af9b5ecbef9019788eb791 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 23:31:02 +0530 Subject: [PATCH 08/18] Update --- qa/L0_simple_ensemble/test.sh | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index 7d4c33967f..2baa1a8ac1 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -202,14 +202,14 @@ parameters: { EOF -SERVER_LOG="./not_on_path_server.log" +SERVER_LOG="./invalid_max_ensemble_inflight_responses_server.log" rm -f $SERVER_LOG run_server -if [ "$SERVER_PID" != "0" ]; then - echo -e "\n***\n*** FAILED: unexpected success starting $SERVER\n***" - kill_server - RET=1 +if [ "$SERVER_PID" == "0" ]; then + echo -e "\n***\n*** Failed to start $SERVER\n***" + cat $SERVER_LOG + exit 1 fi set +e From 0258edaa8c4e5ec73f10074f2f998bcf4a477a8d Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Mon, 13 Oct 2025 23:41:08 +0530 Subject: [PATCH 09/18] Update --- qa/L0_simple_ensemble/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index 2baa1a8ac1..6dde3d7c17 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -226,7 +226,7 @@ if ! grep -q "Ignoring 'max_ensemble_inflight_responses' for ensemble model 'ens fi # Verify invalid string was rejected -if ! grep -q "Failed to parse 'max_ensemble_inflight_responses' for ensemble 'ensemble_invalid_string_limit': stoll" $SERVER_LOG; then +if ! grep -q "Failed to parse 'max_ensemble_inflight_responses' for ensemble 'ensemble_invalid_string_limit'" $SERVER_LOG; then echo -e "\n***\n*** FAILED: Expected error message not found\n***" RET=1 fi From 81561fd6ec42b689c44de89eb4645bd3d794547f Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 14 Oct 2025 18:54:06 +0530 Subject: [PATCH 10/18] Remove duplicate code and add request cancellation test --- .../ensemble_backpressure_test.py | 178 +++++++++++------- qa/L0_simple_ensemble/test.sh | 2 +- 2 files changed, 110 insertions(+), 70 deletions(-) diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index 16cd39b255..9b61c1ee28 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -33,13 +33,18 @@ import queue import unittest from functools import partial +from contextlib import ExitStack +import time import numpy as np import test_util as tu import tritonclient.grpc as grpcclient +from tritonclient.utils import InferenceServerException SERVER_URL = "localhost:8001" DEFAULT_RESPONSE_TIMEOUT = 60 +MODEL_ENSEMBLE_ENABLED = "ensemble_enabled_max_inflight_responses" +MODEL_ENSEMBLE_DISABLED = "ensemble_disabled_max_inflight_responses" class UserData: @@ -86,8 +91,8 @@ def _collect_responses(self, user_data): result, Exception, f"Callback returned an exception: {result}" ) - # Add response to list if it has data (not empty final-only response) response = result.get_response() + # Add response to list if it has data (not empty final-only response) if len(response.outputs) > 0: responses.append(result) @@ -98,29 +103,21 @@ def _collect_responses(self, user_data): return responses - def test_backpressure_limits_inflight(self): + def _run_inference(self, model_name, expected_count): """ - Test that max_ensemble_inflight_responses correctly limits concurrent - responses and prevents unbounded memory growth. + Helper function to run inference and verify responses. """ - model_name = "ensemble_enabled_max_inflight_responses" - expected_count = 16 user_data = UserData() - with grpcclient.InferenceServerClient(SERVER_URL) as triton_client: try: inputs, outputs = self._prepare_infer_args(expected_count) - triton_client.start_stream(callback=partial(callback, user_data)) - triton_client.async_stream_infer( model_name=model_name, inputs=inputs, outputs=outputs ) - # Collect responses + # Collect and verify responses responses = self._collect_responses(user_data) - - # Verify we received the expected number of responses self.assertEqual( len(responses), expected_count, @@ -133,77 +130,46 @@ def test_backpressure_limits_inflight(self): self.assertEqual( output[0], idx, f"Response {idx} has incorrect value" ) - finally: triton_client.stop_stream() - def test_backpressure_disabled(self): + def test_backpressure_limits_inflight(self): """ - Test that ensemble model without max_ensemble_inflight_responses parameter - works fine (original behavior). + Test that max_ensemble_inflight_responses correctly limits concurrent + responses. """ - model_name = "ensemble_disabled_max_inflight_responses" - expected_count = 16 - user_data = UserData() - - with grpcclient.InferenceServerClient(SERVER_URL) as triton_client: - try: - inputs, outputs = self._prepare_infer_args(expected_count) - - triton_client.start_stream(callback=partial(callback, user_data)) - - triton_client.async_stream_infer( - model_name=model_name, inputs=inputs, outputs=outputs - ) - - # Collect responses - responses = self._collect_responses(user_data) - - # Verify we received the expected number of responses - self.assertEqual( - len(responses), - expected_count, - f"Expected {expected_count} responses, got {len(responses)}", - ) - - # Verify correctness of responses - for idx, resp in enumerate(responses): - output = resp.as_numpy("OUT") - self.assertEqual( - output[0], idx, f"Response {idx} has incorrect value" - ) + self._run_inference(model_name=MODEL_ENSEMBLE_ENABLED, expected_count=32) - finally: - triton_client.stop_stream() + def test_backpressure_disabled(self): + """ + Test that an ensemble model without max_ensemble_inflight_responses parameter works correctly. + """ + self._run_inference(model_name=MODEL_ENSEMBLE_DISABLED, expected_count=32) def test_backpressure_concurrent_requests(self): """ Test that backpressure works correctly with multiple concurrent requests. Each request should have independent backpressure state. """ - model_name = "ensemble_enabled_max_inflight_responses" num_concurrent = 8 expected_per_request = 8 + user_datas = [UserData() for _ in range(num_concurrent)] - clients = [] - user_datas = [] + with ExitStack() as stack: + clients = [ + stack.enter_context(grpcclient.InferenceServerClient(SERVER_URL)) + for _ in range(num_concurrent) + ] - try: inputs, outputs = self._prepare_infer_args(expected_per_request) - # Create separate client for each concurrent request + # Start all concurrent requests for i in range(num_concurrent): - client = grpcclient.InferenceServerClient(SERVER_URL) - user_data = UserData() - - client.start_stream(callback=partial(callback, user_data)) - client.async_stream_infer( - model_name=model_name, inputs=inputs, outputs=outputs + clients[i].start_stream(callback=partial(callback, user_datas[i])) + clients[i].async_stream_infer( + model_name=MODEL_ENSEMBLE_ENABLED, inputs=inputs, outputs=outputs ) - clients.append(client) - user_datas.append(user_data) - # Collect and verify responses for all requests for i, ud in enumerate(user_datas): responses = self._collect_responses(ud) @@ -212,21 +178,95 @@ def test_backpressure_concurrent_requests(self): expected_per_request, f"Request {i}: expected {expected_per_request} responses, got {len(responses)}", ) - # Verify correctness of responses for idx, resp in enumerate(responses): output = resp.as_numpy("OUT") self.assertEqual( - output[0], idx, f"Response {idx} has incorrect value" + output[0], + idx, + f"Response {idx} for request {i} has incorrect value", ) - finally: + # Stop all streams for client in clients: + client.stop_stream() + + def test_backpressure_request_cancellation(self): + """ + Test that cancellation unblocks producers waiting on backpressure and that + the client receives a cancellation error. + """ + # Use a large count to ensure the producer gets blocked by backpressure. + # The model is configured with max_ensemble_inflight_responses = 4. + input_value = 32 + user_data = UserData() + + with grpcclient.InferenceServerClient(SERVER_URL) as triton_client: + inputs, outputs = self._prepare_infer_args(input_value) + triton_client.start_stream(callback=partial(callback, user_data)) + + # Start the request + triton_client.async_stream_infer( + model_name=MODEL_ENSEMBLE_ENABLED, inputs=inputs, outputs=outputs + ) + + responses = [] + try: + result = user_data._response_queue.get(timeout=5) + if isinstance(result, InferenceServerException): + self.fail(f"Got error before cancellation: {result}") + resp = result.get_response() + if len(resp.outputs) > 0: + responses.append(result) + except queue.Empty: + self.fail("Stream did not produce any response before cancellation.") + + # Cancel the stream. This should unblock any waiting producers and result in a CANCELLED error. + triton_client.stop_stream(cancel_requests=True) + + # Allow some time for cancellation + time.sleep(1) + + cancellation_found = False + while True: try: - client.stop_stream() - client.close() - except Exception as e: - print(f"Exception during client cleanup: {e}") + result = user_data._response_queue.get(timeout=1) + if isinstance(result, InferenceServerException): + self.assertEqual( + result.status(), + "StatusCode.CANCELLED", + f"Expected CANCELLED status, got: {result.status()}", + ) + cancellation_found = True + break + else: + response = result.get_response() + if len(response.outputs) > 0: + responses.append(result) + # Check for final response + final = response.parameters.get("triton_final_response") + if final and final.bool_param: + break + except queue.Empty: + break + + # Verify the cancellation error was received + self.assertTrue( + cancellation_found, + "Did not receive the expected cancellation error from the server.", + ) + + # Verify we received only a partial set of responses + self.assertLess( + len(responses), + input_value, + "Expected partial responses due to cancellation, but received all of them.", + ) + self.assertGreater( + len(responses), + 0, + "Expected to receive at least one response before cancellation.", + ) if __name__ == "__main__": diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index 6dde3d7c17..db0a11d81f 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -168,7 +168,7 @@ python $BACKPRESSURE_TEST_PY -v >> $CLIENT_LOG 2>&1 if [ $? -ne 0 ]; then RET=1 else - check_test_results $TEST_RESULT_FILE 3 + check_test_results $TEST_RESULT_FILE 4 if [ $? -ne 0 ]; then cat $CLIENT_LOG echo -e "\n***\n*** Test Result Verification Failed\n***" From 10dacec560cc46feee843720b643ca8aa19fffe8 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 14 Oct 2025 18:56:23 +0530 Subject: [PATCH 11/18] Fix pre-commit --- qa/L0_simple_ensemble/ensemble_backpressure_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index 9b61c1ee28..a3643b31a8 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -30,12 +30,12 @@ sys.path.append("../common") +import time import queue import unittest -from functools import partial from contextlib import ExitStack +from functools import partial -import time import numpy as np import test_util as tu import tritonclient.grpc as grpcclient From e2e48a37d8b3d44e624934aafd459dd47c82c977 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 14 Oct 2025 18:58:31 +0530 Subject: [PATCH 12/18] Fix pre-commit --- qa/L0_simple_ensemble/ensemble_backpressure_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index a3643b31a8..3ba0dbe7ef 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -30,8 +30,8 @@ sys.path.append("../common") -import time import queue +import time import unittest from contextlib import ExitStack from functools import partial From f8f1468ca578009c30ceed8699b157ec48a6bd43 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Tue, 14 Oct 2025 20:01:43 +0530 Subject: [PATCH 13/18] Update --- qa/L0_simple_ensemble/test.sh | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index db0a11d81f..a57ee41a20 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -220,18 +220,21 @@ if ! grep -q "Ensemble model 'ensemble_enabled_max_inflight_responses' configure fi # Verify negative value was rejected -if ! grep -q "Ignoring 'max_ensemble_inflight_responses' for ensemble model 'ensemble_invalid_negative_limit': value must be positive, got -5" $SERVER_LOG; then +if ! grep -q "Ensemble model 'ensemble_invalid_negative_limit': max_ensemble_inflight_responses must be greater than 0. Received '-5'. Falling back to default value (0)." $SERVER_LOG; then echo -e "\n***\n*** FAILED: Expected error message not found\n***" RET=1 fi # Verify invalid string was rejected -if ! grep -q "Failed to parse 'max_ensemble_inflight_responses' for ensemble 'ensemble_invalid_string_limit'" $SERVER_LOG; then +if ! grep -q "Ensemble model 'ensemble_invalid_string_limit': failed to parse max_ensemble_inflight_responses='invalid_value'. Falling back to default value (0)." $SERVER_LOG; then echo -e "\n***\n*** FAILED: Expected error message not found\n***" RET=1 fi set -e +kill $SERVER_PID +wait $SERVER_PID + if [ $RET -eq 0 ]; then echo -e "\n***\n*** Test Passed\n***" else From 3d8b848ff0471ec462a521a99703ab4c5e08cb48 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 15 Oct 2025 00:20:01 +0530 Subject: [PATCH 14/18] Update --- qa/L0_simple_ensemble/test.sh | 40 +++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index a57ee41a20..9988a95ce7 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -184,6 +184,7 @@ wait $SERVER_PID # Test invalid values for max_ensemble_inflight_responses parameter mkdir -p `pwd`/models/ensemble_invalid_negative_limit/1 mkdir -p `pwd`/models/ensemble_invalid_string_limit/1 +mkdir -p `pwd`/models/ensemble_large_value_limit/1 cp `pwd`/models/ensemble_disabled_max_inflight_responses/config.pbtxt `pwd`/models/ensemble_invalid_negative_limit/ cat <> `pwd`/models/ensemble_invalid_negative_limit/config.pbtxt @@ -201,39 +202,52 @@ parameters: { } EOF +cp `pwd`/models/ensemble_disabled_max_inflight_responses/config.pbtxt `pwd`/models/ensemble_large_value_limit/ +cat <> `pwd`/models/ensemble_large_value_limit/config.pbtxt +parameters: { + key: "max_ensemble_inflight_responses" + value: { string_value: "12345678901" } +} +EOF SERVER_LOG="./invalid_max_ensemble_inflight_responses_server.log" rm -f $SERVER_LOG run_server -if [ "$SERVER_PID" == "0" ]; then - echo -e "\n***\n*** Failed to start $SERVER\n***" +if [ "$SERVER_PID" != "0" ]; then + echo -e "\n***\n*** FAILED: unexpected success starting $SERVER\n***" + kill $SERVER_PID + wait $SERVER_PID cat $SERVER_LOG - exit 1 + RET=1 fi set +e -# Verify valid config was parsed correctly +# Verify valid config was loaded successfully if ! grep -q "Ensemble model 'ensemble_enabled_max_inflight_responses' configured with max_ensemble_inflight_responses: 4" $SERVER_LOG; then - echo -e "\n***\n*** FAILED: Expected configuration message not found\n***" + echo -e "\n***\n*** FAILED: Valid model did not load successfully\n***" + RET=1 +fi + +# Verify negative value caused model load failure +if ! grep -q "Invalid argument: Invalid 'max_ensemble_inflight_responses' for ensemble model 'ensemble_invalid_negative_limit': value must be positive, got -5" $SERVER_LOG; then + echo -e "\n***\n*** FAILED: Negative value should fail model load\n***" RET=1 fi -# Verify negative value was rejected -if ! grep -q "Ensemble model 'ensemble_invalid_negative_limit': max_ensemble_inflight_responses must be greater than 0. Received '-5'. Falling back to default value (0)." $SERVER_LOG; then - echo -e "\n***\n*** FAILED: Expected error message not found\n***" +# Verify invalid string caused model load failure +if ! grep -q "Invalid argument: Invalid 'max_ensemble_inflight_responses' for ensemble model 'ensemble_invalid_string_limit': cannot parse value 'invalid_value'" $SERVER_LOG; then + echo -e "\n***\n*** FAILED: Invalid string should fail model load\n***" RET=1 fi -# Verify invalid string was rejected -if ! grep -q "Ensemble model 'ensemble_invalid_string_limit': failed to parse max_ensemble_inflight_responses='invalid_value'. Falling back to default value (0)." $SERVER_LOG; then - echo -e "\n***\n*** FAILED: Expected error message not found\n***" +# Verify very large value caused model load failure +if ! grep -q "Invalid argument: Invalid 'max_ensemble_inflight_responses' for ensemble model 'ensemble_large_value_limit': value exceeds maximum allowed (2147483647)" $SERVER_LOG; then + echo -e "\n***\n*** FAILED: Large value should fail model load\n***" RET=1 fi set -e -kill $SERVER_PID -wait $SERVER_PID if [ $RET -eq 0 ]; then echo -e "\n***\n*** Test Passed\n***" From 4a1a8fef591542339377370784ff1ed929edbf3b Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Wed, 15 Oct 2025 15:46:14 +0530 Subject: [PATCH 15/18] Improve model preparation --- .../decoupled_producer/1/model.py | 4 +- .../decoupled_producer/config.pbtxt | 2 +- .../config.pbtxt | 6 +- .../ensemble_backpressure_test.py | 9 +- .../config.pbtxt | 82 ------------------- .../models/slow_consumer/1/model.py | 51 ------------ .../models/slow_consumer/config.pbtxt | 54 ------------ qa/L0_simple_ensemble/test.sh | 75 +++++++++++------ 8 files changed, 61 insertions(+), 222 deletions(-) rename qa/L0_simple_ensemble/{models => backpressure_test_models}/decoupled_producer/1/model.py (93%) rename qa/L0_simple_ensemble/{models => backpressure_test_models}/decoupled_producer/config.pbtxt (98%) rename qa/L0_simple_ensemble/{models => backpressure_test_models}/ensemble_disabled_max_inflight_responses/config.pbtxt (96%) delete mode 100644 qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt delete mode 100644 qa/L0_simple_ensemble/models/slow_consumer/1/model.py delete mode 100644 qa/L0_simple_ensemble/models/slow_consumer/config.pbtxt diff --git a/qa/L0_simple_ensemble/models/decoupled_producer/1/model.py b/qa/L0_simple_ensemble/backpressure_test_models/decoupled_producer/1/model.py similarity index 93% rename from qa/L0_simple_ensemble/models/decoupled_producer/1/model.py rename to qa/L0_simple_ensemble/backpressure_test_models/decoupled_producer/1/model.py index 14199c72dc..fc8ff2f691 100644 --- a/qa/L0_simple_ensemble/models/decoupled_producer/1/model.py +++ b/qa/L0_simple_ensemble/backpressure_test_models/decoupled_producer/1/model.py @@ -42,9 +42,9 @@ def execute(self, requests): response_sender = request.get_response_sender() - # Produce 'count' responses + # Produce 'count' responses, each with 0.5 as the output value for i in range(count): - out_tensor = pb_utils.Tensor("OUT", np.array([i], dtype=np.int32)) + out_tensor = pb_utils.Tensor("OUT", np.array([0.5], dtype=np.float32)) response = pb_utils.InferenceResponse(output_tensors=[out_tensor]) response_sender.send(response) diff --git a/qa/L0_simple_ensemble/models/decoupled_producer/config.pbtxt b/qa/L0_simple_ensemble/backpressure_test_models/decoupled_producer/config.pbtxt similarity index 98% rename from qa/L0_simple_ensemble/models/decoupled_producer/config.pbtxt rename to qa/L0_simple_ensemble/backpressure_test_models/decoupled_producer/config.pbtxt index 23f14f9be5..4ef3a444ce 100644 --- a/qa/L0_simple_ensemble/models/decoupled_producer/config.pbtxt +++ b/qa/L0_simple_ensemble/backpressure_test_models/decoupled_producer/config.pbtxt @@ -40,7 +40,7 @@ input [ output [ { name: "OUT" - data_type: TYPE_INT32 + data_type: TYPE_FP32 dims: [ 1 ] } ] diff --git a/qa/L0_simple_ensemble/models/ensemble_disabled_max_inflight_responses/config.pbtxt b/qa/L0_simple_ensemble/backpressure_test_models/ensemble_disabled_max_inflight_responses/config.pbtxt similarity index 96% rename from qa/L0_simple_ensemble/models/ensemble_disabled_max_inflight_responses/config.pbtxt rename to qa/L0_simple_ensemble/backpressure_test_models/ensemble_disabled_max_inflight_responses/config.pbtxt index c8566dda41..804299cb21 100644 --- a/qa/L0_simple_ensemble/models/ensemble_disabled_max_inflight_responses/config.pbtxt +++ b/qa/L0_simple_ensemble/backpressure_test_models/ensemble_disabled_max_inflight_responses/config.pbtxt @@ -39,7 +39,7 @@ input [ output [ { name: "OUT" - data_type: TYPE_INT32 + data_type: TYPE_FP32 dims: [ 1 ] } ] @@ -62,11 +62,11 @@ ensemble_scheduling { model_name: "slow_consumer" model_version: -1 input_map { - key: "IN" + key: "INPUT0" value: "intermediate" } output_map { - key: "OUT" + key: "OUTPUT0" value: "OUT" } } diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index 3ba0dbe7ef..61a3631abf 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -43,6 +43,7 @@ SERVER_URL = "localhost:8001" DEFAULT_RESPONSE_TIMEOUT = 60 +EXPECTED_INFER_OUTPUT = 0.5 MODEL_ENSEMBLE_ENABLED = "ensemble_enabled_max_inflight_responses" MODEL_ENSEMBLE_DISABLED = "ensemble_disabled_max_inflight_responses" @@ -128,7 +129,9 @@ def _run_inference(self, model_name, expected_count): for idx, resp in enumerate(responses): output = resp.as_numpy("OUT") self.assertEqual( - output[0], idx, f"Response {idx} has incorrect value" + output[0], + EXPECTED_INFER_OUTPUT, + msg=f"Response {idx} has - {output[0]}", ) finally: triton_client.stop_stream() @@ -183,8 +186,8 @@ def test_backpressure_concurrent_requests(self): output = resp.as_numpy("OUT") self.assertEqual( output[0], - idx, - f"Response {idx} for request {i} has incorrect value", + EXPECTED_INFER_OUTPUT, + msg=f"Response {idx} for request {i} has incorrect value - {output[0]}", ) # Stop all streams diff --git a/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt b/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt deleted file mode 100644 index 51fa868f14..0000000000 --- a/qa/L0_simple_ensemble/models/ensemble_enabled_max_inflight_responses/config.pbtxt +++ /dev/null @@ -1,82 +0,0 @@ -# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# * Neither the name of NVIDIA CORPORATION nor the names of its -# contributors may be used to endorse or promote products derived -# from this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY -# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY -# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -platform: "ensemble" -max_batch_size: 0 - -# Backpressure configuration - limits inflight responses from -# decoupled step to avoid unbounded memory growth -parameters: { - key: "max_ensemble_inflight_responses" - value: { string_value: "4" } -} - -input [ - { - name: "IN" - data_type: TYPE_INT32 - dims: [ 1 ] - } -] - -output [ - { - name: "OUT" - data_type: TYPE_INT32 - dims: [ 1 ] - } -] - -ensemble_scheduling { - step [ - { - model_name: "decoupled_producer" - model_version: -1 - input_map { - key: "IN" - value: "IN" - } - output_map { - key: "OUT" - value: "intermediate" - } - }, - { - model_name: "slow_consumer" - model_version: -1 - input_map { - key: "IN" - value: "intermediate" - } - output_map { - key: "OUT" - value: "OUT" - } - } - ] -} - diff --git a/qa/L0_simple_ensemble/models/slow_consumer/1/model.py b/qa/L0_simple_ensemble/models/slow_consumer/1/model.py deleted file mode 100644 index 5aac160b46..0000000000 --- a/qa/L0_simple_ensemble/models/slow_consumer/1/model.py +++ /dev/null @@ -1,51 +0,0 @@ -# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# * Neither the name of NVIDIA CORPORATION nor the names of its -# contributors may be used to endorse or promote products derived -# from this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY -# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY -# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import time - -import triton_python_backend_utils as pb_utils - - -class TritonPythonModel: - """ - Slow consumer model - adds delay to simulate slow processing. - """ - - def execute(self, requests): - responses = [] - - for request in requests: - # Add delay (200ms per request) - time.sleep(0.200) - - # Pass through the input - in_tensor = pb_utils.get_input_tensor_by_name(request, "IN") - out_tensor = pb_utils.Tensor("OUT", in_tensor.as_numpy()) - - response = pb_utils.InferenceResponse(output_tensors=[out_tensor]) - responses.append(response) - - return responses diff --git a/qa/L0_simple_ensemble/models/slow_consumer/config.pbtxt b/qa/L0_simple_ensemble/models/slow_consumer/config.pbtxt deleted file mode 100644 index 64b1f1d315..0000000000 --- a/qa/L0_simple_ensemble/models/slow_consumer/config.pbtxt +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# * Neither the name of NVIDIA CORPORATION nor the names of its -# contributors may be used to endorse or promote products derived -# from this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY -# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR -# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY -# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -name: "slow_consumer" -backend: "python" -max_batch_size: 0 - -input [ - { - name: "IN" - data_type: TYPE_INT32 - dims: [ 1 ] - } -] - -output [ - { - name: "OUT" - data_type: TYPE_INT32 - dims: [ 1 ] - } -] - -instance_group [ - { - count: 1 - kind: KIND_CPU - } -] - diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index 9988a95ce7..2e32f13df0 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -31,17 +31,16 @@ SIMPLE_TEST_PY=./ensemble_test.py CLIENT_LOG="./client.log" +TEST_MODEL_DIR="`pwd`/models" TEST_RESULT_FILE='test_results.txt' SERVER=/opt/tritonserver/bin/tritonserver -SERVER_ARGS="--model-repository=`pwd`/models" +SERVER_ARGS="--model-repository=${TEST_MODEL_DIR}" SERVER_LOG="./inference_server.log" source ../common/util.sh # ensure ensemble models have version sub-directory -mkdir -p `pwd`/models/ensemble_add_sub_int32_int32_int32/1 -mkdir -p `pwd`/models/ensemble_partial_add_sub/1 -mkdir -p `pwd`/models/ensemble_enabled_max_inflight_responses/1 -mkdir -p `pwd`/models/ensemble_disabled_max_inflight_responses/1 +mkdir -p ${TEST_MODEL_DIR}/ensemble_add_sub_int32_int32_int32/1 +mkdir -p ${TEST_MODEL_DIR}/ensemble_partial_add_sub/1 rm -f $CLIENT_LOG $SERVER_LOG @@ -148,14 +147,29 @@ set -e kill $SERVER_PID wait $SERVER_PID -# Test ensemble backpressure feature (max_ensemble_inflight_responses parameter) +######## Test ensemble backpressure feature (max_ensemble_inflight_responses parameter) +MODEL_DIR="`pwd`/backpressure_test_models" +mkdir -p ${MODEL_DIR}/slow_consumer/1 +cp ../python_models/ground_truth/model.py ${MODEL_DIR}/slow_consumer/1 +cp ../python_models/ground_truth/config.pbtxt ${MODEL_DIR}/slow_consumer/ +sed -i 's/name: "ground_truth"/name: "slow_consumer"/g' ${MODEL_DIR}/slow_consumer/config.pbtxt + +mkdir -p ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/1 +mkdir -p ${MODEL_DIR}/ensemble_enabled_max_inflight_responses/1 +cp ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/config.pbtxt ${MODEL_DIR}/ensemble_enabled_max_inflight_responses/ +cat <> ${MODEL_DIR}/ensemble_enabled_max_inflight_responses/config.pbtxt +parameters: { + key: "max_ensemble_inflight_responses" + value: { string_value: "4" } +} +EOF + +BACKPRESSURE_TEST_PY=./ensemble_backpressure_test.py SERVER_LOG="./ensemble_backpressure_test_server.log" CLIENT_LOG="./ensemble_backpressure_test_client.log" -BACKPRESSURE_TEST_PY=./ensemble_backpressure_test.py - rm -f $SERVER_LOG $CLIENT_LOG -SERVER_ARGS="--model-repository=`pwd`/models" +SERVER_ARGS="--model-repository=${MODEL_DIR}" run_server if [ "$SERVER_PID" == "0" ]; then echo -e "\n***\n*** Failed to start $SERVER\n***" @@ -180,36 +194,51 @@ set -e kill $SERVER_PID wait $SERVER_PID +set +e +# Verify valid config was loaded successfully +if ! grep -q "Ensemble model 'ensemble_enabled_max_inflight_responses' configured with max_ensemble_inflight_responses: 4" $SERVER_LOG; then + echo -e "\n***\n*** FAILED: Valid model did not load successfully\n***" + RET=1 +fi +set -e + + +######## Test invalid values for max_ensemble_inflight_responses parameter +INVALID_PARAM_MODEL_DIR="`pwd`/invalid_param_test_models" +rm -rf ${INVALID_PARAM_MODEL_DIR} + +mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/1 +mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/1 +mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/1 -# Test invalid values for max_ensemble_inflight_responses parameter -mkdir -p `pwd`/models/ensemble_invalid_negative_limit/1 -mkdir -p `pwd`/models/ensemble_invalid_string_limit/1 -mkdir -p `pwd`/models/ensemble_large_value_limit/1 +cp -r ${MODEL_DIR}/decoupled_producer ${INVALID_PARAM_MODEL_DIR}/ +cp -r ${MODEL_DIR}/slow_consumer ${INVALID_PARAM_MODEL_DIR}/ -cp `pwd`/models/ensemble_disabled_max_inflight_responses/config.pbtxt `pwd`/models/ensemble_invalid_negative_limit/ -cat <> `pwd`/models/ensemble_invalid_negative_limit/config.pbtxt +cp ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/ +cat <> ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/config.pbtxt parameters: { key: "max_ensemble_inflight_responses" value: { string_value: "-5" } } EOF -cp `pwd`/models/ensemble_disabled_max_inflight_responses/config.pbtxt `pwd`/models/ensemble_invalid_string_limit/ -cat <> `pwd`/models/ensemble_invalid_string_limit/config.pbtxt +cp ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/ +cat <> ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/config.pbtxt parameters: { key: "max_ensemble_inflight_responses" value: { string_value: "invalid_value" } } EOF -cp `pwd`/models/ensemble_disabled_max_inflight_responses/config.pbtxt `pwd`/models/ensemble_large_value_limit/ -cat <> `pwd`/models/ensemble_large_value_limit/config.pbtxt +cp ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/ +cat <> ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/config.pbtxt parameters: { key: "max_ensemble_inflight_responses" value: { string_value: "12345678901" } } EOF +SERVER_ARGS="--model-repository=${INVALID_PARAM_MODEL_DIR}" SERVER_LOG="./invalid_max_ensemble_inflight_responses_server.log" rm -f $SERVER_LOG @@ -223,12 +252,6 @@ if [ "$SERVER_PID" != "0" ]; then fi set +e -# Verify valid config was loaded successfully -if ! grep -q "Ensemble model 'ensemble_enabled_max_inflight_responses' configured with max_ensemble_inflight_responses: 4" $SERVER_LOG; then - echo -e "\n***\n*** FAILED: Valid model did not load successfully\n***" - RET=1 -fi - # Verify negative value caused model load failure if ! grep -q "Invalid argument: Invalid 'max_ensemble_inflight_responses' for ensemble model 'ensemble_invalid_negative_limit': value must be positive, got -5" $SERVER_LOG; then echo -e "\n***\n*** FAILED: Negative value should fail model load\n***" @@ -242,7 +265,7 @@ if ! grep -q "Invalid argument: Invalid 'max_ensemble_inflight_responses' for en fi # Verify very large value caused model load failure -if ! grep -q "Invalid argument: Invalid 'max_ensemble_inflight_responses' for ensemble model 'ensemble_large_value_limit': value exceeds maximum allowed (2147483647)" $SERVER_LOG; then +if ! grep -q "Invalid argument: Invalid 'max_ensemble_inflight_responses' for ensemble model 'ensemble_invalid_large_value_limit': value exceeds maximum allowed (2147483647)" $SERVER_LOG; then echo -e "\n***\n*** FAILED: Large value should fail model load\n***" RET=1 fi From 554e1b9487043707152afa379c503eeee555e7fe Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sat, 18 Oct 2025 00:22:52 +0530 Subject: [PATCH 16/18] Update tests --- .../ensemble_backpressure_test.py | 8 +-- qa/L0_simple_ensemble/test.sh | 59 +++++++++---------- 2 files changed, 31 insertions(+), 36 deletions(-) diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index 61a3631abf..802f6b8da3 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -62,7 +62,7 @@ def callback(user_data, result, error): class EnsembleBackpressureTest(tu.TestResultCollector): """ - Tests for ensemble backpressure feature (max_ensemble_inflight_responses). + Tests for ensemble backpressure feature (max_inflight_responses). """ def _prepare_infer_args(self, input_value): @@ -138,14 +138,14 @@ def _run_inference(self, model_name, expected_count): def test_backpressure_limits_inflight(self): """ - Test that max_ensemble_inflight_responses correctly limits concurrent + Test that max_inflight_responses correctly limits concurrent responses. """ self._run_inference(model_name=MODEL_ENSEMBLE_ENABLED, expected_count=32) def test_backpressure_disabled(self): """ - Test that an ensemble model without max_ensemble_inflight_responses parameter works correctly. + Test that an ensemble model without max_inflight_responses parameter works correctly. """ self._run_inference(model_name=MODEL_ENSEMBLE_DISABLED, expected_count=32) @@ -200,7 +200,7 @@ def test_backpressure_request_cancellation(self): the client receives a cancellation error. """ # Use a large count to ensure the producer gets blocked by backpressure. - # The model is configured with max_ensemble_inflight_responses = 4. + # The model is configured with max_inflight_responses = 4. input_value = 32 user_data = UserData() diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index 2e32f13df0..a1737c5f98 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -147,22 +147,22 @@ set -e kill $SERVER_PID wait $SERVER_PID -######## Test ensemble backpressure feature (max_ensemble_inflight_responses parameter) +######## Test ensemble backpressure feature (max_inflight_responses parameter) MODEL_DIR="`pwd`/backpressure_test_models" +mkdir -p ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/1 + +rm -rf ${MODEL_DIR}/slow_consumer mkdir -p ${MODEL_DIR}/slow_consumer/1 cp ../python_models/ground_truth/model.py ${MODEL_DIR}/slow_consumer/1 cp ../python_models/ground_truth/config.pbtxt ${MODEL_DIR}/slow_consumer/ sed -i 's/name: "ground_truth"/name: "slow_consumer"/g' ${MODEL_DIR}/slow_consumer/config.pbtxt -mkdir -p ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/1 +# Create ensemble with backpressure enabled (limit = 4) +rm -rf ${MODEL_DIR}/ensemble_enabled_max_inflight_responses mkdir -p ${MODEL_DIR}/ensemble_enabled_max_inflight_responses/1 cp ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/config.pbtxt ${MODEL_DIR}/ensemble_enabled_max_inflight_responses/ -cat <> ${MODEL_DIR}/ensemble_enabled_max_inflight_responses/config.pbtxt -parameters: { - key: "max_ensemble_inflight_responses" - value: { string_value: "4" } -} -EOF +sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_responses: 4/g' \ + ${MODEL_DIR}/ensemble_enabled_max_inflight_responses/config.pbtxt BACKPRESSURE_TEST_PY=./ensemble_backpressure_test.py SERVER_LOG="./ensemble_backpressure_test_server.log" @@ -196,14 +196,14 @@ wait $SERVER_PID set +e # Verify valid config was loaded successfully -if ! grep -q "Ensemble model 'ensemble_enabled_max_inflight_responses' configured with max_ensemble_inflight_responses: 4" $SERVER_LOG; then +if ! grep -q "Ensemble model 'ensemble_enabled_max_inflight_responses' configured with max_inflight_responses: 4" $SERVER_LOG; then echo -e "\n***\n*** FAILED: Valid model did not load successfully\n***" RET=1 fi set -e -######## Test invalid values for max_ensemble_inflight_responses parameter +######## Test invalid protobuf value (negative uint32 should fail at parse time) INVALID_PARAM_MODEL_DIR="`pwd`/invalid_param_test_models" rm -rf ${INVALID_PARAM_MODEL_DIR} @@ -214,32 +214,27 @@ mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/1 cp -r ${MODEL_DIR}/decoupled_producer ${INVALID_PARAM_MODEL_DIR}/ cp -r ${MODEL_DIR}/slow_consumer ${INVALID_PARAM_MODEL_DIR}/ +# max_inflight_responses = -5 cp ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/ -cat <> ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/config.pbtxt -parameters: { - key: "max_ensemble_inflight_responses" - value: { string_value: "-5" } -} -EOF +sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_responses: -5/g' \ + ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/config.pbtxt +# max_inflight_responses = "invalid_value" cp ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/ -cat <> ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/config.pbtxt -parameters: { - key: "max_ensemble_inflight_responses" - value: { string_value: "invalid_value" } -} -EOF +sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_responses: "invalid_value"/g' \ + ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/config.pbtxt +# max_inflight_responses = 12345678901 cp ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/ -cat <> ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/config.pbtxt -parameters: { - key: "max_ensemble_inflight_responses" - value: { string_value: "12345678901" } -} -EOF +sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_responses: 12345678901/g' \ + ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/config.pbtxt + +cp -r ${MODEL_DIR}/decoupled_producer ${INVALID_PARAM_MODEL_DIR}/ +cp -r ${MODEL_DIR}/slow_consumer ${INVALID_PARAM_MODEL_DIR}/ + SERVER_ARGS="--model-repository=${INVALID_PARAM_MODEL_DIR}" -SERVER_LOG="./invalid_max_ensemble_inflight_responses_server.log" +SERVER_LOG="./invalid_max_inflight_responses_server.log" rm -f $SERVER_LOG run_server @@ -253,19 +248,19 @@ fi set +e # Verify negative value caused model load failure -if ! grep -q "Invalid argument: Invalid 'max_ensemble_inflight_responses' for ensemble model 'ensemble_invalid_negative_limit': value must be positive, got -5" $SERVER_LOG; then +if ! grep -q "Expected integer, got: -" $SERVER_LOG; then echo -e "\n***\n*** FAILED: Negative value should fail model load\n***" RET=1 fi # Verify invalid string caused model load failure -if ! grep -q "Invalid argument: Invalid 'max_ensemble_inflight_responses' for ensemble model 'ensemble_invalid_string_limit': cannot parse value 'invalid_value'" $SERVER_LOG; then +if ! grep -q 'Expected integer, got: "invalid_value"' $SERVER_LOG; then echo -e "\n***\n*** FAILED: Invalid string should fail model load\n***" RET=1 fi # Verify very large value caused model load failure -if ! grep -q "Invalid argument: Invalid 'max_ensemble_inflight_responses' for ensemble model 'ensemble_invalid_large_value_limit': value exceeds maximum allowed (2147483647)" $SERVER_LOG; then +if ! grep -q "Integer out of range (12345678901)" $SERVER_LOG; then echo -e "\n***\n*** FAILED: Large value should fail model load\n***" RET=1 fi From b2ad7359acc1f03fc355f97b548774d1248f3216 Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sat, 18 Oct 2025 02:05:19 +0530 Subject: [PATCH 17/18] Add documentation --- docs/user_guide/decoupled_models.md | 8 ++- docs/user_guide/ensemble_models.md | 60 +++++++++++++++++++ .../ensemble_backpressure_test.py | 8 ++- qa/L0_simple_ensemble/test.sh | 19 ++---- 4 files changed, 78 insertions(+), 17 deletions(-) diff --git a/docs/user_guide/decoupled_models.md b/docs/user_guide/decoupled_models.md index fbe6f4c298..04b2f99b08 100644 --- a/docs/user_guide/decoupled_models.md +++ b/docs/user_guide/decoupled_models.md @@ -95,7 +95,13 @@ your application should be cognizant that the callback function you registered w `TRITONSERVER_InferenceRequestSetResponseCallback` can be invoked any number of times, each time with a new response. You can take a look at [grpc_server.cc](https://github.com/triton-inference-server/server/blob/main/src/grpc/grpc_server.cc) -### Knowing When a Decoupled Inference Request is Complete +### Using Decoupled Models in Ensembles + +When using decoupled models within an [ensemble](ensemble_models.md), you may encounter unbounded memory growth if a decoupled model produces responses faster than downstream models can consume them. To address this, Triton provides the `max_inflight_responses` configuration field, which limits the number of concurrent inflight responses between ensemble steps. + +For more details and examples, see [Managing Memory Usage in Ensembles with Decoupled Models](ensemble_models.md#managing-memory-usage-in-ensembles-with-decoupled-models). + +## Knowing When a Decoupled Inference Request is Complete An inference request is considered complete when a response containing the `TRITONSERVER_RESPONSE_COMPLETE_FINAL` flag is received from a model/backend. diff --git a/docs/user_guide/ensemble_models.md b/docs/user_guide/ensemble_models.md index 4012ec60c7..eb7c6c26e8 100644 --- a/docs/user_guide/ensemble_models.md +++ b/docs/user_guide/ensemble_models.md @@ -183,6 +183,66 @@ performance, you can use [Model Analyzer](https://github.com/triton-inference-server/model_analyzer) to find the optimal model configurations. +## Managing Memory Usage in Ensembles with Decoupled Models + +An *inflight response* is an intermediate output generated by an upstream model and held in memory until it is consumed by a downstream model within an ensemble pipeline. When an ensemble pipeline contains [decoupled models](decoupled_models.md) that produce responses faster than downstream models can process them, inflight responses accumulate internally and may cause unbounded memory growth. This commonly occurs in data preprocessing pipelines where a fast decoupled model (such as DALI, which efficiently streams and preprocesses data) feeds into a slower inference model (such as ONNX Runtime or TensorRT, which are compute-intensive and operate at a lower throughput). + +Consider an ensemble with two steps: +1. **DALI preprocessor** (decoupled): Produces 100 preprocessed images/sec +2. **ONNX inference model**: Consumes 10 images/sec + +Here, the DALI model produces responses 10× faster than the ONNX model can process them. Without backpressure, these intermediate tensors accumulate in memory, eventually leading to out-of-memory errors. + +The `max_inflight_responses` field in the ensemble configuration limits the number of concurrent inflight responses between ensemble steps per request. +When this limit is reached, faster upstream models are paused (blocked) until downstream models finish processing, effectively preventing unbounded memory growth. + +``` +ensemble_scheduling { + max_inflight_responses: 16 + + step [ + { + model_name: "dali_preprocess" + model_version: -1 + input_map { key: "RAW_IMAGE", value: "IMAGE" } + output_map { key: "PREPROCESSED_IMAGE", value: "preprocessed" } + }, + { + model_name: "onnx_inference" + model_version: -1 + input_map { key: "INPUT", value: "preprocessed" } + output_map { key: "OUTPUT", value: "RESULT" } + } + ] +} +``` + +**Configuration:** +* **`max_inflight_responses: 16`**: For each ensemble request (not globally), at most 16 responses from `dali_preprocess` + can wait for `onnx_inference` to process. Once this per-request limit is reached, `dali_preprocess` is blocked until the downstream step completes a response. +* **Default (`0`)**: No limit - allows unlimited inflight responses (original behavior). + +### When to Use This Feature + +Use `max_inflight_responses` when your ensemble includes: +* **Decoupled models** that produce multiple responses per request +* **Speed mismatch**: Upstream models significantly faster than downstream models +* **Memory constraints**: Limited GPU/CPU memory available + +### Choosing the Right Value + +The optimal value depends on your deployment configuration, including batch size, request rate, available memory, and throughput characteristics.: + +* **Too low** (e.g., 1-2): The producer step is frequently blocked, underutilizing faster models +* **Too high** (e.g., 1000+): Memory usage increases, reducing the effectiveness of backpressure +* **Recommended**: Start with a small value and tune based on memory usage and throughput monitoring + +### Performance Considerations + +* **Zero overhead when disabled**: If `max_inflight_responses: 0` (default), + no synchronization overhead is incurred. +* **Minimal overhead when enabled**: Uses a blocking/wakeup mechanism per ensemble step, where upstream models are paused ("blocked") when the inflight response limit is reached and resumed ("woken up") as downstream models consume responses. This synchronization ensures memory usage stays within bounds, though it may increase latency. + ## Additional Resources You can find additional end-to-end ensemble examples in the links below: diff --git a/qa/L0_simple_ensemble/ensemble_backpressure_test.py b/qa/L0_simple_ensemble/ensemble_backpressure_test.py index 802f6b8da3..fc312db554 100755 --- a/qa/L0_simple_ensemble/ensemble_backpressure_test.py +++ b/qa/L0_simple_ensemble/ensemble_backpressure_test.py @@ -128,10 +128,11 @@ def _run_inference(self, model_name, expected_count): # Verify correctness of responses for idx, resp in enumerate(responses): output = resp.as_numpy("OUT") - self.assertEqual( + self.assertAlmostEqual( output[0], EXPECTED_INFER_OUTPUT, - msg=f"Response {idx} has - {output[0]}", + places=5, + msg=f"Response {idx} has incorrect value - {output[0]}", ) finally: triton_client.stop_stream() @@ -184,9 +185,10 @@ def test_backpressure_concurrent_requests(self): # Verify correctness of responses for idx, resp in enumerate(responses): output = resp.as_numpy("OUT") - self.assertEqual( + self.assertAlmostEqual( output[0], EXPECTED_INFER_OUTPUT, + places=5, msg=f"Response {idx} for request {i} has incorrect value - {output[0]}", ) diff --git a/qa/L0_simple_ensemble/test.sh b/qa/L0_simple_ensemble/test.sh index a1737c5f98..de74e81ee5 100755 --- a/qa/L0_simple_ensemble/test.sh +++ b/qa/L0_simple_ensemble/test.sh @@ -157,7 +157,7 @@ cp ../python_models/ground_truth/model.py ${MODEL_DIR}/slow_consumer/1 cp ../python_models/ground_truth/config.pbtxt ${MODEL_DIR}/slow_consumer/ sed -i 's/name: "ground_truth"/name: "slow_consumer"/g' ${MODEL_DIR}/slow_consumer/config.pbtxt -# Create ensemble with backpressure enabled (limit = 4) +# Create ensemble with "max_inflight_responses = 4" rm -rf ${MODEL_DIR}/ensemble_enabled_max_inflight_responses mkdir -p ${MODEL_DIR}/ensemble_enabled_max_inflight_responses/1 cp ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/config.pbtxt ${MODEL_DIR}/ensemble_enabled_max_inflight_responses/ @@ -203,16 +203,16 @@ fi set -e -######## Test invalid protobuf value (negative uint32 should fail at parse time) +######## Test invalid value for "max_inflight_responses" INVALID_PARAM_MODEL_DIR="`pwd`/invalid_param_test_models" -rm -rf ${INVALID_PARAM_MODEL_DIR} +SERVER_ARGS="--model-repository=${INVALID_PARAM_MODEL_DIR}" +SERVER_LOG="./invalid_max_inflight_responses_server.log" +rm -rf $SERVER_LOG ${INVALID_PARAM_MODEL_DIR} mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/1 mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_string_limit/1 mkdir -p ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/1 - -cp -r ${MODEL_DIR}/decoupled_producer ${INVALID_PARAM_MODEL_DIR}/ -cp -r ${MODEL_DIR}/slow_consumer ${INVALID_PARAM_MODEL_DIR}/ +cp -r ${MODEL_DIR}/decoupled_producer ${MODEL_DIR}/slow_consumer ${INVALID_PARAM_MODEL_DIR}/ # max_inflight_responses = -5 cp ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/config.pbtxt ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_negative_limit/ @@ -229,13 +229,6 @@ cp ${MODEL_DIR}/ensemble_disabled_max_inflight_responses/config.pbtxt ${INVALID_ sed -i 's/ensemble_scheduling {/ensemble_scheduling {\n max_inflight_responses: 12345678901/g' \ ${INVALID_PARAM_MODEL_DIR}/ensemble_invalid_large_value_limit/config.pbtxt -cp -r ${MODEL_DIR}/decoupled_producer ${INVALID_PARAM_MODEL_DIR}/ -cp -r ${MODEL_DIR}/slow_consumer ${INVALID_PARAM_MODEL_DIR}/ - - -SERVER_ARGS="--model-repository=${INVALID_PARAM_MODEL_DIR}" -SERVER_LOG="./invalid_max_inflight_responses_server.log" -rm -f $SERVER_LOG run_server if [ "$SERVER_PID" != "0" ]; then From 977420afe8c004ddedd31ce80830f091d0f8787e Mon Sep 17 00:00:00 2001 From: Sai Kiran Polisetty Date: Sat, 18 Oct 2025 02:06:58 +0530 Subject: [PATCH 18/18] Update copyright --- docs/user_guide/decoupled_models.md | 2 +- docs/user_guide/ensemble_models.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/user_guide/decoupled_models.md b/docs/user_guide/decoupled_models.md index 04b2f99b08..8f4b22ebe2 100644 --- a/docs/user_guide/decoupled_models.md +++ b/docs/user_guide/decoupled_models.md @@ -1,5 +1,5 @@