|
| 1 | +import json |
| 2 | + |
| 3 | +import triton_python_backend_utils as pb_utils |
| 4 | + |
| 5 | + |
| 6 | +def read_parameter_as_type(value, name, pytype=str): |
| 7 | + if value == "": |
| 8 | + return None |
| 9 | + if value.startswith("${") and value.endswith("}"): |
| 10 | + return None |
| 11 | + if pytype is bool: |
| 12 | + return value.lower() in ["1", "true"] |
| 13 | + try: |
| 14 | + result = pytype(value) |
| 15 | + return result |
| 16 | + except: |
| 17 | + pb_utils.Logger.log_warning( |
| 18 | + f"Could not read parameter '{name}' with value '{value}', will use default." |
| 19 | + ) |
| 20 | + return None |
| 21 | + |
| 22 | + |
| 23 | +def get_parameter(model_config, name, pytype=str): |
| 24 | + if name not in model_config['parameters']: |
| 25 | + return None |
| 26 | + return read_parameter_as_type( |
| 27 | + model_config['parameters'][name]['string_value'], name, pytype) |
| 28 | + |
| 29 | + |
| 30 | +class TritonPythonModel: |
| 31 | + """Your Python model must use the same class name. Every Python model |
| 32 | + that is created must have "TritonPythonModel" as the class name. |
| 33 | + """ |
| 34 | + |
| 35 | + def initialize(self, args): |
| 36 | + """`initialize` is called only once when the model is being loaded. |
| 37 | + Implementing `initialize` function is optional. This function allows |
| 38 | + the model to initialize any state associated with this model. |
| 39 | +
|
| 40 | + Parameters |
| 41 | + ---------- |
| 42 | + args : dict |
| 43 | + Both keys and values are strings. The dictionary keys and values are: |
| 44 | + * model_config: A JSON string containing the model configuration |
| 45 | + * model_instance_kind: A string containing model instance kind |
| 46 | + * model_instance_device_id: A string containing model instance device ID |
| 47 | + * model_repository: Model repository path |
| 48 | + * model_version: Model version |
| 49 | + * model_name: Model name |
| 50 | + """ |
| 51 | + model_config = json.loads(args['model_config']) |
| 52 | + self.context_model_name = get_parameter(model_config, |
| 53 | + "context_model_name") |
| 54 | + self.generation_model_name = get_parameter(model_config, |
| 55 | + "generation_model_name") |
| 56 | + self.decoupled = pb_utils.using_decoupled_model_transaction_policy( |
| 57 | + model_config) |
| 58 | + |
| 59 | + def create_context_request(self, request): |
| 60 | + inputs = request.inputs() |
| 61 | + triton_request = pb_utils.InferenceRequest( |
| 62 | + model_name=self.context_model_name, |
| 63 | + inputs=inputs, |
| 64 | + parameters={"request_type": "context_only"}, |
| 65 | + requested_output_names=[]) |
| 66 | + return triton_request |
| 67 | + |
| 68 | + def create_generation_request(self, request, context_response): |
| 69 | + inputs = request.inputs() |
| 70 | + context_phase_params = pb_utils.get_output_tensor_by_name( |
| 71 | + context_response, "context_phase_params") |
| 72 | + if context_phase_params is None: |
| 73 | + raise pb_utils.TritonModelException( |
| 74 | + "Context response must have an output named context phase params" |
| 75 | + ) |
| 76 | + inputs.append(context_phase_params) |
| 77 | + triton_request = pb_utils.InferenceRequest( |
| 78 | + model_name=self.generation_model_name, |
| 79 | + inputs=inputs, |
| 80 | + parameters={"request_type": "generation_only"}, |
| 81 | + requested_output_names=[]) |
| 82 | + return triton_request |
| 83 | + |
| 84 | + def execute(self, requests): |
| 85 | + """`execute` must be implemented in every Python model. `execute` |
| 86 | + function receives a list of pb_utils.InferenceRequest as the only |
| 87 | + argument. This function is called when an inference is requested |
| 88 | + for this model. |
| 89 | +
|
| 90 | + Parameters |
| 91 | + ---------- |
| 92 | + requests : list |
| 93 | + A list of pb_utils.InferenceRequest |
| 94 | +
|
| 95 | + Returns |
| 96 | + ------- |
| 97 | + list |
| 98 | + A list of pb_utils.InferenceResponse. The length of this list must |
| 99 | + be the same as `requests` |
| 100 | + """ |
| 101 | + for request in requests: |
| 102 | + context_request = self.create_context_request(request) |
| 103 | + context_responses = context_request.exec(decoupled=self.decoupled) |
| 104 | + if self.decoupled: |
| 105 | + context_responses = list(context_responses) |
| 106 | + assert len( |
| 107 | + context_responses) == 1, "Expected 1 context response" |
| 108 | + |
| 109 | + if self.decoupled: |
| 110 | + context_response = context_responses[0] |
| 111 | + else: |
| 112 | + context_response = context_responses |
| 113 | + if context_response.has_error(): |
| 114 | + raise pb_utils.TritonModelException( |
| 115 | + f"Context model {self.context_model_name} failed with error: {context_response.error().message()}" |
| 116 | + ) |
| 117 | + generation_request = self.create_generation_request( |
| 118 | + request, context_response) |
| 119 | + |
| 120 | + # TODO(itabrizian): Send the context response to reduce TTFT in decoupled case. |
| 121 | + # It requires adding the generated token to the generation request |
| 122 | + # to avoid sending the first token multiple times. |
| 123 | + responses = generation_request.exec(decoupled=self.decoupled) |
| 124 | + |
| 125 | + if self.decoupled: |
| 126 | + for response in responses: |
| 127 | + if response.has_error(): |
| 128 | + raise pb_utils.TritonModelException( |
| 129 | + f"Generation model {self.generation_model_name} failed with error: {response.error().message()}" |
| 130 | + ) |
| 131 | + request.get_response_sender().send(response) |
| 132 | + |
| 133 | + request.get_response_sender().send( |
| 134 | + flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL) |
| 135 | + else: |
| 136 | + request.get_response_sender().send( |
| 137 | + responses, |
| 138 | + flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL) |
0 commit comments