From de442769610ce26f734ea6d7e56ef335c4314d5a Mon Sep 17 00:00:00 2001 From: wnma3mz Date: Wed, 2 Oct 2024 15:01:04 +0800 Subject: [PATCH] bytes bfloat16 replace float32 --- README.md | 5 ++++- examples/config.json | 4 ++-- tllm/commons/convert.py | 17 +++++++++++++++++ tllm/engine.py | 17 ++++++++++------- tllm/rpc/client.py | 14 +++++++++----- tllm/rpc/manager.py | 2 +- tllm/rpc/schemas.proto | 29 +++++------------------------ tllm/rpc/schemas_pb2.py | 40 +++++++++++++++++++++------------------- tllm/rpc/schemas_pb2.pyi | 22 +++++++++++++++------- 9 files changed, 84 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index d8b81ae..b4e940f 100644 --- a/README.md +++ b/README.md @@ -53,4 +53,7 @@ bfloat 16 CPU Llama-3.2-1B-Instruct 单机时间:10.96 token/s Llama-3.2-1B-Instruct 单机时间:5.73 token/s(包含首token生成的时间, transformers 框架 TTFT 时间不方便记录) -TODO: Meta-Llama-3-8B-Instruct in GPU \ No newline at end of file +TODO: Meta-Llama-3-8B-Instruct in GPU + +多维数组实现(float32): 单机通信在 0.002 s 左右 (seq-len=1) +bytes 实现(float32): 单机通信在 0.001 s 左右 (seq-len=1) \ No newline at end of file diff --git a/examples/config.json b/examples/config.json index 2f1221d..b1f1693 100644 --- a/examples/config.json +++ b/examples/config.json @@ -4,11 +4,11 @@ "url": "localhost:25001", "tp_size": 2, "master_port": 29501, - "layer_idx": [0, 11] + "layer_idx": [0, 1] }, { "pp_rank": 1, - "url": "192.168.0.101:25002", + "url": "localhost:25002", "tp_size": 2, "master_port": 29502, "layer_idx": [11, 22] diff --git a/tllm/commons/convert.py b/tllm/commons/convert.py index 927e0f3..c08d812 100644 --- a/tllm/commons/convert.py +++ b/tllm/commons/convert.py @@ -1,6 +1,9 @@ from typing import List +import torch + from tllm.rpc import schemas_pb2, schemas_pb2_grpc +from tllm.rpc.schemas_pb2 import BFloat16Tensor def protobuf_to_list(proto_message): @@ -80,3 +83,17 @@ def list_to_protobuf(data: List): raise ValueError("Input data must be a list.") return multi_array_proto + + +def serialize_bfloat16_tensor(tensor) -> BFloat16Tensor: + # TODO: support bfloat16 + tensor_proto = BFloat16Tensor() + tensor_proto.shape.extend(tensor.shape) # 添加形状 + tensor_proto.data = tensor.to(torch.float32).numpy().tobytes() + return tensor_proto + + +def deserialize_bfloat16_tensor(tensor_proto) -> torch.Tensor: + data = torch.frombuffer(tensor_proto.data, dtype=torch.float32).to(torch.bfloat16) + tensor_data = data.view(*tensor_proto.shape) + return tensor_data diff --git a/tllm/engine.py b/tllm/engine.py index e85aa99..e202747 100644 --- a/tllm/engine.py +++ b/tllm/engine.py @@ -9,12 +9,14 @@ from transformers import AutoConfig from transformers.models.llama.modeling_llama import LlamaRMSNorm +from tllm.commons.convert import deserialize_bfloat16_tensor, serialize_bfloat16_tensor from tllm.generate.decode_utils import DecodeUtils from tllm.rpc.manager import RPCManager -from tllm.utils import tensor_to_list finish_reason_type = Literal["length", "stop", None] +logging.basicConfig(level=logging.INFO) + @dataclass class GenerateResult: @@ -32,9 +34,9 @@ class GenerateEnd: @dataclass class ForwardResult: - hidden_states: Optional[torch.Tensor] = None logits: torch.Tensor comm_cost_time_list: Optional[List[float]] = None + hidden_states: Optional[torch.Tensor] = None def is_generate_end(output_ids: List[int], eos_token_id: int, max_new_tokens: int) -> GenerateEnd: @@ -73,8 +75,10 @@ def from_pretrained(cls, model_path: str, weight_path: str, server: RPCManager, model.eval() return model - def _prepare_forward_data(self, uuid_str: str, hidden_states: torch.Tensor) -> Dict[str, Any]: - return {"uuid": uuid_str, "hidden_states": tensor_to_list(hidden_states)} + def _prepare_forward_data(self, uuid_str: str, hidden_states: torch.Tensor, need_serialize: bool) -> Dict[str, Any]: + if need_serialize: + hidden_states = serialize_bfloat16_tensor(hidden_states) + return {"uuid": uuid_str, "hidden_states": hidden_states} def forward(self, inputs_embeds: torch.Tensor, uuid_str: str) -> ForwardResult: hidden_states = inputs_embeds @@ -82,11 +86,10 @@ def forward(self, inputs_embeds: torch.Tensor, uuid_str: str) -> ForwardResult: for pp_idx in range(self.pp_size): s1 = time.time() outputs = self.server.post_sync( - pp_idx, "/forward", data=self._prepare_forward_data(uuid_str, hidden_states) + pp_idx, "/forward", data=self._prepare_forward_data(uuid_str, hidden_states, need_serialize=pp_idx == 0) ) + hidden_states = deserialize_bfloat16_tensor(outputs.output) if pp_idx == 0 else outputs.output s2 = time.time() - assert self.server.is_success(outputs), "Forward failed" - hidden_states = outputs.hidden_states comm_cost_time_list.append(s2 - s1 - outputs.cost_time) hidden_states = torch.tensor(hidden_states).to(inputs_embeds.dtype).to(self.norm.weight.device) diff --git a/tllm/rpc/client.py b/tllm/rpc/client.py index 3d8186d..dfc182b 100644 --- a/tllm/rpc/client.py +++ b/tllm/rpc/client.py @@ -9,7 +9,12 @@ import grpc from tllm.commons.communicator import Communicator, SingleNodeCommunicator -from tllm.commons.convert import list_to_protobuf, protobuf_to_list +from tllm.commons.convert import ( + deserialize_bfloat16_tensor, + list_to_protobuf, + protobuf_to_list, + serialize_bfloat16_tensor, +) from tllm.models.llama import MyLlamaModel from tllm.rpc import schemas_pb2, schemas_pb2_grpc from tllm.utils import get_ip_address, tensor_to_list @@ -66,12 +71,11 @@ def InitModel(self, request: schemas_pb2.ModelConfig, context: grpc.ServicerCont def Forward(self, request: schemas_pb2.ForwardRequest, context: grpc.ServicerContext): """ @param request: ForwardRequest - hidden_states: torch.Tensor + hidden_states: bytes uuid: str """ s1 = time.time() - hidden_states = protobuf_to_list(request.hidden_states) - hidden_states = torch.tensor(hidden_states, dtype=self.model.dtype) + hidden_states = deserialize_bfloat16_tensor(request.hidden_states) serialized_data = list(pickle.dumps((hidden_states.shape, request.uuid))) tensor_data = torch.ByteTensor(serialized_data) @@ -81,7 +85,7 @@ def Forward(self, request: schemas_pb2.ForwardRequest, context: grpc.ServicerCon output = self.model(hidden_states, request.uuid) - return_output = list_to_protobuf(tensor_to_list(output)) + return_output = serialize_bfloat16_tensor(output) cost_time = time.time() - s1 logging.info(f"{self.prefix_log_str} Forward pass cost time: {cost_time:.2f} s") diff --git a/tllm/rpc/manager.py b/tllm/rpc/manager.py index d5c46d2..0d4e8ff 100644 --- a/tllm/rpc/manager.py +++ b/tllm/rpc/manager.py @@ -38,7 +38,7 @@ def init_model(self, stub, data): return stub.InitModel(request) def forward(self, stub, data): - request = schemas_pb2.ForwardRequest(uuid=data["uuid"], hidden_states=list_to_protobuf(data["hidden_states"])) + request = schemas_pb2.ForwardRequest(uuid=data["uuid"], hidden_states=data["hidden_states"]) return stub.Forward(request) def health(self, stub): diff --git a/tllm/rpc/schemas.proto b/tllm/rpc/schemas.proto index 848918d..f4e63df 100644 --- a/tllm/rpc/schemas.proto +++ b/tllm/rpc/schemas.proto @@ -2,31 +2,12 @@ syntax = "proto3"; package schemas; -message Array { - repeated float elements = 1; // One-dimensional array of integers -} - -message Matrix { - repeated Array rows = 1; // Two-dimensional array of integers -} -message Tensor { - repeated Matrix layers = 1; // Three-dimensional array of integers +message BFloat16Tensor { + bytes data = 1; // 使用 bytes 存储 bfloat16 数据 + repeated int32 shape = 2; // 形状信息 } -message BlockTensor { - repeated Tensor blocks = 1; // Four-dimensional array of integers -} - -message MultiDimensionalArray { - // Union type to represent any dimensional array - oneof multi_array { - Array array = 1; - Matrix matrix = 2; - Tensor tensor = 3; - BlockTensor block_tensor = 4; - } -} message ModelConfig { string model_name = 1; @@ -39,7 +20,7 @@ message ModelConfig { message ForwardRequest { string uuid = 1; - MultiDimensionalArray hidden_states = 2; + BFloat16Tensor hidden_states = 2; } message StatusResponse { @@ -50,7 +31,7 @@ message StatusResponse { message ForwardResponse { string msg = 1; int32 status = 2; - MultiDimensionalArray output = 3; + BFloat16Tensor output = 3; float cost_time = 4; } diff --git a/tllm/rpc/schemas_pb2.py b/tllm/rpc/schemas_pb2.py index 366ca2d..d4cad91 100644 --- a/tllm/rpc/schemas_pb2.py +++ b/tllm/rpc/schemas_pb2.py @@ -19,7 +19,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x16tllm/rpc/schemas.proto\x12\x07schemas"\x19\n\x05\x41rray\x12\x10\n\x08\x65lements\x18\x01 \x03(\x02"&\n\x06Matrix\x12\x1c\n\x04rows\x18\x01 \x03(\x0b\x32\x0e.schemas.Array")\n\x06Tensor\x12\x1f\n\x06layers\x18\x01 \x03(\x0b\x32\x0f.schemas.Matrix".\n\x0b\x42lockTensor\x12\x1f\n\x06\x62locks\x18\x01 \x03(\x0b\x32\x0f.schemas.Tensor"\xbb\x01\n\x15MultiDimensionalArray\x12\x1f\n\x05\x61rray\x18\x01 \x01(\x0b\x32\x0e.schemas.ArrayH\x00\x12!\n\x06matrix\x18\x02 \x01(\x0b\x32\x0f.schemas.MatrixH\x00\x12!\n\x06tensor\x18\x03 \x01(\x0b\x32\x0f.schemas.TensorH\x00\x12,\n\x0c\x62lock_tensor\x18\x04 \x01(\x0b\x32\x14.schemas.BlockTensorH\x00\x42\r\n\x0bmulti_array"\x8c\x01\n\x0bModelConfig\x12\x12\n\nmodel_name\x18\x01 \x01(\t\x12\x0f\n\x07pp_rank\x18\x02 \x01(\x05\x12\x17\n\x0flayer_idx_start\x18\x03 \x01(\x05\x12\x15\n\rlayer_idx_end\x18\x04 \x01(\x05\x12\x12\n\nmaster_url\x18\x05 \x01(\t\x12\x14\n\x0cnext_pp_rank\x18\x06 \x01(\x05"U\n\x0e\x46orwardRequest\x12\x0c\n\x04uuid\x18\x01 \x01(\t\x12\x35\n\rhidden_states\x18\x02 \x01(\x0b\x32\x1e.schemas.MultiDimensionalArray"-\n\x0eStatusResponse\x12\x0b\n\x03msg\x18\x01 \x01(\t\x12\x0e\n\x06status\x18\x02 \x01(\x05"q\n\x0f\x46orwardResponse\x12\x0b\n\x03msg\x18\x01 \x01(\t\x12\x0e\n\x06status\x18\x02 \x01(\x05\x12.\n\x06output\x18\x03 \x01(\x0b\x32\x1e.schemas.MultiDimensionalArray\x12\x11\n\tcost_time\x18\x04 \x01(\x02"-\n\x0eHealthResponse\x12\x0b\n\x03msg\x18\x01 \x01(\t\x12\x0e\n\x06status\x18\x02 \x01(\x05"4\n\x15InitModelFlagResponse\x12\x0b\n\x03msg\x18\x01 \x01(\x08\x12\x0e\n\x06status\x18\x02 \x01(\x05"\x07\n\x05\x45mpty2\xfa\x01\n\nRPCService\x12:\n\tInitModel\x12\x14.schemas.ModelConfig\x1a\x17.schemas.StatusResponse\x12<\n\x07\x46orward\x12\x17.schemas.ForwardRequest\x1a\x18.schemas.ForwardResponse\x12\x31\n\x06Health\x12\x0e.schemas.Empty\x1a\x17.schemas.HealthResponse\x12?\n\rInitModelFlag\x12\x0e.schemas.Empty\x1a\x1e.schemas.InitModelFlagResponseb\x06proto3' + b'\n\x16tllm/rpc/schemas.proto\x12\x07schemas"\x19\n\x05\x41rray\x12\x10\n\x08\x65lements\x18\x01 \x03(\x02"&\n\x06Matrix\x12\x1c\n\x04rows\x18\x01 \x03(\x0b\x32\x0e.schemas.Array")\n\x06Tensor\x12\x1f\n\x06layers\x18\x01 \x03(\x0b\x32\x0f.schemas.Matrix".\n\x0b\x42lockTensor\x12\x1f\n\x06\x62locks\x18\x01 \x03(\x0b\x32\x0f.schemas.Tensor"-\n\x0e\x42\x46loat16Tensor\x12\x0c\n\x04\x64\x61ta\x18\x01 \x01(\x0c\x12\r\n\x05shape\x18\x02 \x03(\x05"\xbb\x01\n\x15MultiDimensionalArray\x12\x1f\n\x05\x61rray\x18\x01 \x01(\x0b\x32\x0e.schemas.ArrayH\x00\x12!\n\x06matrix\x18\x02 \x01(\x0b\x32\x0f.schemas.MatrixH\x00\x12!\n\x06tensor\x18\x03 \x01(\x0b\x32\x0f.schemas.TensorH\x00\x12,\n\x0c\x62lock_tensor\x18\x04 \x01(\x0b\x32\x14.schemas.BlockTensorH\x00\x42\r\n\x0bmulti_array"\x8c\x01\n\x0bModelConfig\x12\x12\n\nmodel_name\x18\x01 \x01(\t\x12\x0f\n\x07pp_rank\x18\x02 \x01(\x05\x12\x17\n\x0flayer_idx_start\x18\x03 \x01(\x05\x12\x15\n\rlayer_idx_end\x18\x04 \x01(\x05\x12\x12\n\nmaster_url\x18\x05 \x01(\t\x12\x14\n\x0cnext_pp_rank\x18\x06 \x01(\x05"N\n\x0e\x46orwardRequest\x12\x0c\n\x04uuid\x18\x01 \x01(\t\x12.\n\rhidden_states\x18\x02 \x01(\x0b\x32\x17.schemas.BFloat16Tensor"-\n\x0eStatusResponse\x12\x0b\n\x03msg\x18\x01 \x01(\t\x12\x0e\n\x06status\x18\x02 \x01(\x05"j\n\x0f\x46orwardResponse\x12\x0b\n\x03msg\x18\x01 \x01(\t\x12\x0e\n\x06status\x18\x02 \x01(\x05\x12\'\n\x06output\x18\x03 \x01(\x0b\x32\x17.schemas.BFloat16Tensor\x12\x11\n\tcost_time\x18\x04 \x01(\x02"-\n\x0eHealthResponse\x12\x0b\n\x03msg\x18\x01 \x01(\t\x12\x0e\n\x06status\x18\x02 \x01(\x05"4\n\x15InitModelFlagResponse\x12\x0b\n\x03msg\x18\x01 \x01(\x08\x12\x0e\n\x06status\x18\x02 \x01(\x05"\x07\n\x05\x45mpty2\xfa\x01\n\nRPCService\x12:\n\tInitModel\x12\x14.schemas.ModelConfig\x1a\x17.schemas.StatusResponse\x12<\n\x07\x46orward\x12\x17.schemas.ForwardRequest\x1a\x18.schemas.ForwardResponse\x12\x31\n\x06Health\x12\x0e.schemas.Empty\x1a\x17.schemas.HealthResponse\x12?\n\rInitModelFlag\x12\x0e.schemas.Empty\x1a\x1e.schemas.InitModelFlagResponseb\x06proto3' ) _globals = globals() @@ -35,22 +35,24 @@ _globals["_TENSOR"]._serialized_end = 143 _globals["_BLOCKTENSOR"]._serialized_start = 145 _globals["_BLOCKTENSOR"]._serialized_end = 191 - _globals["_MULTIDIMENSIONALARRAY"]._serialized_start = 194 - _globals["_MULTIDIMENSIONALARRAY"]._serialized_end = 381 - _globals["_MODELCONFIG"]._serialized_start = 384 - _globals["_MODELCONFIG"]._serialized_end = 524 - _globals["_FORWARDREQUEST"]._serialized_start = 526 - _globals["_FORWARDREQUEST"]._serialized_end = 611 - _globals["_STATUSRESPONSE"]._serialized_start = 613 - _globals["_STATUSRESPONSE"]._serialized_end = 658 - _globals["_FORWARDRESPONSE"]._serialized_start = 660 - _globals["_FORWARDRESPONSE"]._serialized_end = 773 - _globals["_HEALTHRESPONSE"]._serialized_start = 775 - _globals["_HEALTHRESPONSE"]._serialized_end = 820 - _globals["_INITMODELFLAGRESPONSE"]._serialized_start = 822 - _globals["_INITMODELFLAGRESPONSE"]._serialized_end = 874 - _globals["_EMPTY"]._serialized_start = 876 - _globals["_EMPTY"]._serialized_end = 883 - _globals["_RPCSERVICE"]._serialized_start = 886 - _globals["_RPCSERVICE"]._serialized_end = 1136 + _globals["_BFLOAT16TENSOR"]._serialized_start = 193 + _globals["_BFLOAT16TENSOR"]._serialized_end = 238 + _globals["_MULTIDIMENSIONALARRAY"]._serialized_start = 241 + _globals["_MULTIDIMENSIONALARRAY"]._serialized_end = 428 + _globals["_MODELCONFIG"]._serialized_start = 431 + _globals["_MODELCONFIG"]._serialized_end = 571 + _globals["_FORWARDREQUEST"]._serialized_start = 573 + _globals["_FORWARDREQUEST"]._serialized_end = 651 + _globals["_STATUSRESPONSE"]._serialized_start = 653 + _globals["_STATUSRESPONSE"]._serialized_end = 698 + _globals["_FORWARDRESPONSE"]._serialized_start = 700 + _globals["_FORWARDRESPONSE"]._serialized_end = 806 + _globals["_HEALTHRESPONSE"]._serialized_start = 808 + _globals["_HEALTHRESPONSE"]._serialized_end = 853 + _globals["_INITMODELFLAGRESPONSE"]._serialized_start = 855 + _globals["_INITMODELFLAGRESPONSE"]._serialized_end = 907 + _globals["_EMPTY"]._serialized_start = 909 + _globals["_EMPTY"]._serialized_end = 916 + _globals["_RPCSERVICE"]._serialized_start = 919 + _globals["_RPCSERVICE"]._serialized_end = 1169 # @@protoc_insertion_point(module_scope) diff --git a/tllm/rpc/schemas_pb2.pyi b/tllm/rpc/schemas_pb2.pyi index bc671cd..731b8e9 100644 --- a/tllm/rpc/schemas_pb2.pyi +++ b/tllm/rpc/schemas_pb2.pyi @@ -1,3 +1,6 @@ +from google.protobuf.internal import containers as _containers +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message from typing import ( ClassVar as _ClassVar, Iterable as _Iterable, @@ -6,9 +9,6 @@ from typing import ( Union as _Union, ) -from google.protobuf import descriptor as _descriptor, message as _message -from google.protobuf.internal import containers as _containers - DESCRIPTOR: _descriptor.FileDescriptor class Array(_message.Message): @@ -35,6 +35,14 @@ class BlockTensor(_message.Message): blocks: _containers.RepeatedCompositeFieldContainer[Tensor] def __init__(self, blocks: _Optional[_Iterable[_Union[Tensor, _Mapping]]] = ...) -> None: ... +class BFloat16Tensor(_message.Message): + __slots__ = ("data", "shape") + DATA_FIELD_NUMBER: _ClassVar[int] + SHAPE_FIELD_NUMBER: _ClassVar[int] + data: bytes + shape: _containers.RepeatedScalarFieldContainer[int] + def __init__(self, data: _Optional[bytes] = ..., shape: _Optional[_Iterable[int]] = ...) -> None: ... + class MultiDimensionalArray(_message.Message): __slots__ = ("array", "matrix", "tensor", "block_tensor") ARRAY_FIELD_NUMBER: _ClassVar[int] @@ -82,9 +90,9 @@ class ForwardRequest(_message.Message): UUID_FIELD_NUMBER: _ClassVar[int] HIDDEN_STATES_FIELD_NUMBER: _ClassVar[int] uuid: str - hidden_states: MultiDimensionalArray + hidden_states: BFloat16Tensor def __init__( - self, uuid: _Optional[str] = ..., hidden_states: _Optional[_Union[MultiDimensionalArray, _Mapping]] = ... + self, uuid: _Optional[str] = ..., hidden_states: _Optional[_Union[BFloat16Tensor, _Mapping]] = ... ) -> None: ... class StatusResponse(_message.Message): @@ -103,13 +111,13 @@ class ForwardResponse(_message.Message): COST_TIME_FIELD_NUMBER: _ClassVar[int] msg: str status: int - output: MultiDimensionalArray + output: BFloat16Tensor cost_time: float def __init__( self, msg: _Optional[str] = ..., status: _Optional[int] = ..., - output: _Optional[_Union[MultiDimensionalArray, _Mapping]] = ..., + output: _Optional[_Union[BFloat16Tensor, _Mapping]] = ..., cost_time: _Optional[float] = ..., ) -> None: ...