From b4b158495b61ea29a706fe82836e374480ff1dc5 Mon Sep 17 00:00:00 2001 From: sangchengmeng Date: Mon, 7 Jul 2025 17:43:45 +0800 Subject: [PATCH 1/8] [fix]fix image rpyc --- lightllm/models/whisper/whisper_audio.py | 2 +- lightllm/server/audioserver/manager.py | 2 +- .../embed_cache/impl/naive_memory_cache.py | 126 ++++++++++++------ lightllm/server/embed_cache/interface.py | 19 +-- lightllm/server/embed_cache/manager.py | 20 +-- lightllm/server/httpserver/manager.py | 56 ++++++-- lightllm/server/visualserver/manager.py | 9 +- .../visualserver/model_infer/model_rpc.py | 2 +- 8 files changed, 162 insertions(+), 74 deletions(-) diff --git a/lightllm/models/whisper/whisper_audio.py b/lightllm/models/whisper/whisper_audio.py index 4a96efbf1..7abb0e242 100644 --- a/lightllm/models/whisper/whisper_audio.py +++ b/lightllm/models/whisper/whisper_audio.py @@ -89,7 +89,7 @@ def __init__(self, kvargs): self.sampling_rate = 16000 self.max_length = self.max_seconds * self.sampling_rate self.cache_port = kvargs["cache_port"] - self.cache_client = rpyc.connect("localhost", self.cache_port) + self.cache_client = rpyc.connect("localhost", self.cache_port, config={"allow_pickle": True}) data_type = kvargs["data_type"] if data_type in ["bf16", "bfloat16"]: self.data_type = torch.bfloat16 diff --git a/lightllm/server/audioserver/manager.py b/lightllm/server/audioserver/manager.py index abd1416d1..dffd75965 100644 --- a/lightllm/server/audioserver/manager.py +++ b/lightllm/server/audioserver/manager.py @@ -33,7 +33,7 @@ def __init__( self.recv_from_visualserver = context.socket(zmq.PULL) self.recv_from_visualserver.bind(f"{args.zmq_mode}127.0.0.1:{audio_port}") - self.cache_client = rpyc.connect("localhost", cache_port) + self.cache_client = rpyc.connect("localhost", cache_port, config={"allow_pickle": True}) self.cache_port = cache_port self.waiting_reqs: List[GroupReqIndexes] = [] self.model_weightdir = args.model_dir diff --git a/lightllm/server/embed_cache/impl/naive_memory_cache.py b/lightllm/server/embed_cache/impl/naive_memory_cache.py index c03b084c4..f59634d6f 100644 --- a/lightllm/server/embed_cache/impl/naive_memory_cache.py +++ b/lightllm/server/embed_cache/impl/naive_memory_cache.py @@ -89,54 +89,100 @@ def _clear(self): if deleted >= max_delete: break - def alloc(self, md5sum: str, token_num: int) -> dict: + # def alloc(self, md5sum: str, token_num: int) -> dict: + # with self.lock: + # t = time.time() + # # add new record + # if md5sum not in self._md5_to_record: + + # # full, need to clear some unused items + # if self.occupied >= self.capacity: + # self._clear() + # if self.occupied >= self.capacity: + # return None + + # id = uuid.uuid1() + # id = id.int + # self._check_and_set_new_id_range(token_num) + # record = Record( + # id=id, + # md5sum=md5sum, + # ref=1, + # data=False, + # embed=False, + # createtime=t, + # visittime=t, + # token_id=self.token_id_range_start, + # token_num=token_num, + # ) + # self.token_id_range_start += token_num + # self._records[id] = record + # self._md5_to_record[md5sum] = record + # self.occupied += 1 + + # # cache hit + # else: + # record = self._md5_to_record[md5sum] + # record.visittime = t + # record.ref += 1 + + # return {"id": record.id, "token_id": record.token_id, "token_num": record.token_num} + + def alloc_batch(self, md5_list: list[str], token_num_list: list[int]) -> list[dict]: + results = [] with self.lock: - t = time.time() - # add new record - if md5sum not in self._md5_to_record: - - # full, need to clear some unused items - if self.occupied >= self.capacity: - self._clear() + for md5, tnum in zip(md5_list, token_num_list): + t = time.time() + if md5 not in self._md5_to_record: + # 若不存在则分配新记录(与alloc逻辑相同) if self.occupied >= self.capacity: - return None - - id = uuid.uuid1() - id = id.int - self._check_and_set_new_id_range(token_num) - record = Record( - id=id, - md5sum=md5sum, - ref=1, - data=False, - embed=False, - createtime=t, - visittime=t, - token_id=self.token_id_range_start, - token_num=token_num, - ) - self.token_id_range_start += token_num - self._records[id] = record - self._md5_to_record[md5sum] = record - self.occupied += 1 - - # cache hit - else: - record = self._md5_to_record[md5sum] - record.visittime = t - record.ref += 1 - - return {"id": record.id, "token_id": record.token_id, "token_num": record.token_num} + self._clear() + if self.occupied >= self.capacity: + results.append(None) + continue + new_id = uuid.uuid1().int + self._check_and_set_new_id_range(tnum) + record = Record( + id=new_id, + md5sum=md5, + ref=1, + data=False, + embed=False, + createtime=t, + visittime=t, + token_id=self.token_id_range_start, + token_num=tnum, + ) + self.token_id_range_start += tnum + self._records[new_id] = record + self._md5_to_record[md5] = record + self.occupied += 1 + else: + # 缓存命中,更新引用计数和访问时间 + record = self._md5_to_record[md5] + record.visittime = t + record.ref += 1 + results.append({"id": record.id, "token_id": record.token_id, "token_num": record.token_num}) + return results def release(self, id: int) -> None: with self.lock: self._records[id].ref -= 1 - def set_item_data(self, id: int) -> None: - self._records[id].data = True + # def set_item_data(self, id: int) -> None: + # self._records[id].data = True + + # def get_item_data(self, id: int) -> bool: + # return self._records[id].data - def get_item_data(self, id: int) -> bool: - return self._records[id].data + def get_items_data(self, ids: list[int]) -> list[bool]: + with self.lock: + return [self._records.get(i).data if i in self._records else False for i in ids] + + def set_items_data(self, ids: list[int]) -> None: + with self.lock: + for i in ids: + self._records[i].data = True def set_item_embed(self, id: int) -> None: self._records[id].embed = True diff --git a/lightllm/server/embed_cache/interface.py b/lightllm/server/embed_cache/interface.py index 030b59986..8553e83df 100644 --- a/lightllm/server/embed_cache/interface.py +++ b/lightllm/server/embed_cache/interface.py @@ -1,21 +1,22 @@ from typing import Union + class CacheManager(object): - ''' Defines the interface of embedding cache manager. - ''' + """Defines the interface of embedding cache manager.""" + def __init__(self) -> None: pass - def alloc(self, md5sum: str, token_num: int) -> dict: + def alloc_batch(self, md5sum_list: list[str], token_num_list: list[int]) -> list[dict]: pass def release(self, id: int) -> None: pass - def set_item_data(self, id: int) -> None: + def set_items_data(self, ids: list[int]) -> None: pass - def get_item_data(self, id: int) -> bool: + def get_items_data(self, ids: list[int]) -> list[bool]: pass def set_item_embed(self, id: int) -> None: @@ -38,11 +39,11 @@ def add_register_item(key, value): cls._impls[key] = value return value - if callable(target): # 如果传入的目标可调用,说明之前没有给出注册名字,我们就以传入的函数或者类的名字作为注册名 + if callable(target): # 如果传入的目标可调用,说明之前没有给出注册名字,我们就以传入的函数或者类的名字作为注册名 return add_register_item(target.__name__, target) - else: # 如果不可调用,说明额外说明了注册的可调用对象的名字 - return lambda x : add_register_item(target, x) - + else: # 如果不可调用,说明额外说明了注册的可调用对象的名字 + return lambda x: add_register_item(target, x) + @classmethod def get_impl(cls, name: str): return cls._impls[name] diff --git a/lightllm/server/embed_cache/manager.py b/lightllm/server/embed_cache/manager.py index 85ed32505..314bcec4d 100644 --- a/lightllm/server/embed_cache/manager.py +++ b/lightllm/server/embed_cache/manager.py @@ -22,23 +22,23 @@ def on_disconnect(self, conn): # (to finalize the service, if needed) pass - def exposed_alloc(self, md5sum: str, token_num: int) -> dict: - md5sum = obtain(md5sum) - token_num = obtain(token_num) - record = self._impl.alloc(md5sum, token_num) + def exposed_alloc_batch(self, md5sum_list: list[str], token_num_list: list[int]) -> dict: + md5sum_list = obtain(md5sum_list) + token_num_list = obtain(token_num_list) + record = self._impl.alloc(md5sum_list, token_num_list) return record def exposed_release(self, id: int) -> None: id = obtain(id) return self._impl.release(id) - def exposed_set_item_data(self, id: int) -> None: - id = obtain(id) - return self._impl.set_item_data(id=id) + def exposed_set_items_data(self, ids: list[int]) -> None: + ids = obtain(ids) + return self._impl.set_items_data(ids=ids) - def exposed_get_item_data(self, id: int) -> bool: - id = obtain(id) - return self._impl.get_item_data(id=id) + def exposed_get_items_data(self, ids: list[int]) -> list[bool]: + ids = obtain(ids) + return self._impl.get_items_data(ids=ids) def exposed_set_item_embed(self, id: int) -> None: id = obtain(id) diff --git a/lightllm/server/httpserver/manager.py b/lightllm/server/httpserver/manager.py index 72a33f5e0..020bfdd0b 100644 --- a/lightllm/server/httpserver/manager.py +++ b/lightllm/server/httpserver/manager.py @@ -81,7 +81,7 @@ def __init__( self.enable_multimodal = enable_multimodal if self.enable_multimodal: - self.cache_client = rpyc.connect("localhost", cache_port) + self.cache_client = rpyc.connect("localhost", cache_port, onfig={"allow_pickle": True}) self.send_to_visual = context.socket(zmq.PUSH) self.send_to_visual.connect(f"{args.zmq_mode}127.0.0.1:{visual_port}") @@ -148,19 +148,55 @@ async def _alloc_multimodal_resources(self, multimodal_params: MultimodalParams, # 如果不加任何锁,假如请求1和请求2都有6张图片,而cache_capacity为10, # 那么如果某一时刻shm中存在请求1的5张图和请求2的5张图,将会资源竞争产生死锁。 async with self._resource_lock: + items, md5s, token_nums, datas = [], [], [], [] for img in multimodal_params.images: self.tokenizer.init_imageitem_extral_params(img, multimodal_params, sampling_params) - record = await self._alloc_resource(img) - img.uuid = record["id"] - img.token_id = record["token_id"] - img.token_num = record["token_num"] + data = img.read() + # must after init_imageitem_extral_params + num_tokens = self.tokenizer.get_image_token_length(img) + md5 = hashlib.md5(data).hexdigest() + "_" + str(hash(frozendict(img.extra_params))) + md5s.append(md5) + token_nums.append(num_tokens) + datas.append(data) + items.append(img) + # img.uuid = record["id"] + # img.token_id = record["token_id"] + # img.token_num = record["token_num"] for audio in multimodal_params.audios: self.tokenizer.init_audioitem_extral_params(audio, multimodal_params, sampling_params) - record = await self._alloc_resource(audio) - audio.uuid = record["id"] - audio.token_id = record["token_id"] - audio.token_num = record["token_num"] - return + data = audio.read() + num_tokens = self.tokenizer.get_audio_token_length(audio) + md5 = hashlib.md5(data).hexdigest() + "_" + str(hash(frozendict(audio.extra_params))) + md5s.append(md5) + token_nums.append(num_tokens) + datas.append(data) + items.append(audio) + # audio.uuid = record["id"] + # audio.token_id = record["token_id"] + # audio.token_num = record["token_num"] + wait_time = 1 + while True: + records = self.cache_client.root.alloc_batch(md5s, token_nums) + if all(r is not None for r in records): + # hit or new + break + # cache full + await asyncio.sleep(wait_time) + wait_time = min(wait_time + 2, 9) + uids = [record["id"] for record in records] + data_ready = self.cache_client.root.get_items_data(uids) + + uids_to_write = [] + for item, record, data, ready in zip(items, records, datas, data_ready): + item.uuid = record["id"] + item.token_id = record["token_id"] + item.token_num = record["token_num"] + if not ready: + create_shm(get_shm_name_data(item.uuid), data) + self.cache_client.root.set_items_data(item.uuid) + uids_to_write.append(item.uuid) + if uids_to_write: + self.cache_client.root.set_items_data(uids_to_write) async def _release_multimodal_resources(self, multimodal_params: MultimodalParams): # 只有 P 和 NORMAL 节点需要真的管理多模态资源 diff --git a/lightllm/server/visualserver/manager.py b/lightllm/server/visualserver/manager.py index 6fabe2465..f16b0bd66 100644 --- a/lightllm/server/visualserver/manager.py +++ b/lightllm/server/visualserver/manager.py @@ -120,8 +120,13 @@ async def loop_for_fwd(self): multimodal_params = group_req_indexes.multimodal_params - for img in multimodal_params.images: - if not self.cache_client.root.get_item_embed(img.uuid): + img_uuids = [img.uuid for img in multimodal_params.images] + ready_flags = [] + for uuid in img_uuids: + ready_flags.append(self.cache_client.root.get_items_embed(uuid)) + + for img, ready in zip(multimodal_params.images, ready_flags): + if not ready: images_need_infer.append(img) if len(images_need_infer) == self.infer_batch_size: diff --git a/lightllm/server/visualserver/model_infer/model_rpc.py b/lightllm/server/visualserver/model_infer/model_rpc.py index 2d0d99a50..a275a4002 100644 --- a/lightllm/server/visualserver/model_infer/model_rpc.py +++ b/lightllm/server/visualserver/model_infer/model_rpc.py @@ -38,7 +38,7 @@ def exposed_init_model(self, kvargs): self.cache_port = kvargs["cache_port"] weight_dir = kvargs["weight_dir"] self.vit_rank_id = kvargs["vit_rank_id"] - self.cache_client = rpyc.connect("localhost", self.cache_port) + self.cache_client = rpyc.connect("localhost", self.cache_port, config={"allow_pickle": True}) self.data_type = kvargs["data_type"] init_vision_distributed_env(kvargs) From 5abacf2dc76086a72127a39f2c6f5aa6cf3c18e5 Mon Sep 17 00:00:00 2001 From: sangchengmeng Date: Mon, 7 Jul 2025 21:22:51 +0800 Subject: [PATCH 2/8] [fix]fix rpyc in multimodal process --- lightllm/models/whisper/whisper_audio.py | 16 ++- lightllm/server/audioserver/manager.py | 7 +- .../embed_cache/impl/naive_memory_cache.py | 101 ++++---------- lightllm/server/embed_cache/interface.py | 49 ------- lightllm/server/embed_cache/manager.py | 29 ++-- lightllm/server/httpserver/manager.py | 125 +++++++++--------- lightllm/server/visualserver/manager.py | 8 +- .../visualserver/model_infer/model_rpc.py | 18 ++- 8 files changed, 132 insertions(+), 221 deletions(-) delete mode 100644 lightllm/server/embed_cache/interface.py diff --git a/lightllm/models/whisper/whisper_audio.py b/lightllm/models/whisper/whisper_audio.py index 7abb0e242..bc36a9d21 100644 --- a/lightllm/models/whisper/whisper_audio.py +++ b/lightllm/models/whisper/whisper_audio.py @@ -190,8 +190,14 @@ def encode(self, audio_items: List[AudioItem]): audio_lens_after_cnn = np.array(audio_lens_after_cnn, dtype=np.int32) audio_token_num = (audio_lens_after_cnn - 2) // 2 + 1 - for i in range(len(uuids)): - if not self.cache_client.root.get_item_embed(uuids[i]): - cur_embed_bytes = tensor2bytes(audios[i][: audio_token_num[i]]) - create_shm(get_shm_name_embed(uuids[i]), cur_embed_bytes) - self.cache_client.root.set_item_embed(uuids[i]) + ready_audio = self.cache_client.root.get_items_data(uuids) + ids_to_set = [] + for i, ready in enumerate(ready_audio): + if ready: + continue + uid = uuids[i] + cur_embed_bytes = tensor2bytes(audios[i][: audio_token_num[i]]) + create_shm(get_shm_name_data(uid), cur_embed_bytes) + ids_to_set.append(uid) + if ids_to_set: + self.cache_client.root.set_items_data(ids=ids_to_set) diff --git a/lightllm/server/audioserver/manager.py b/lightllm/server/audioserver/manager.py index dffd75965..a403f5ae7 100644 --- a/lightllm/server/audioserver/manager.py +++ b/lightllm/server/audioserver/manager.py @@ -94,8 +94,11 @@ async def loop_for_fwd(self): multimodal_params = group_req_indexes.multimodal_params - for audio in multimodal_params.audios: - if not self.cache_client.root.get_item_embed(audio.uuid): + audio_uuids = [audio.uuid for audio in multimodal_params.audios] + ready_audio = self.cache_client.root.get_items_embed(audio_uuids) + + for audio, ready in zip(multimodal_params.audios, ready_audio): + if not ready: audios_need_infer.append(audio) if len(audios_need_infer) == self.infer_batch_size: diff --git a/lightllm/server/embed_cache/impl/naive_memory_cache.py b/lightllm/server/embed_cache/impl/naive_memory_cache.py index f59634d6f..0ba701245 100644 --- a/lightllm/server/embed_cache/impl/naive_memory_cache.py +++ b/lightllm/server/embed_cache/impl/naive_memory_cache.py @@ -2,7 +2,6 @@ import threading import dataclasses import requests -from ..interface import CacheManager, CacheManagerFactory from typing import Union import torch import time @@ -27,8 +26,7 @@ class Record(object): token_num: int -@CacheManagerFactory.register("naive") -class InMemoryCache(CacheManager): +class InMemoryCache: def __init__(self, args) -> None: self.args = args self._records = dict() @@ -89,103 +87,58 @@ def _clear(self): if deleted >= max_delete: break - # def alloc(self, md5sum: str, token_num: int) -> dict: - # with self.lock: - # t = time.time() - # # add new record - # if md5sum not in self._md5_to_record: - - # # full, need to clear some unused items - # if self.occupied >= self.capacity: - # self._clear() - # if self.occupied >= self.capacity: - # return None - - # id = uuid.uuid1() - # id = id.int - # self._check_and_set_new_id_range(token_num) - # record = Record( - # id=id, - # md5sum=md5sum, - # ref=1, - # data=False, - # embed=False, - # createtime=t, - # visittime=t, - # token_id=self.token_id_range_start, - # token_num=token_num, - # ) - # self.token_id_range_start += token_num - # self._records[id] = record - # self._md5_to_record[md5sum] = record - # self.occupied += 1 - - # # cache hit - # else: - # record = self._md5_to_record[md5sum] - # record.visittime = t - # record.ref += 1 - - # return {"id": record.id, "token_id": record.token_id, "token_num": record.token_num} - - def alloc_batch(self, md5_list: list[str], token_num_list: list[int]) -> list[dict]: + def alloc(self, md5sum_list: list[str], token_num_list: list[int]) -> list[dict]: results = [] with self.lock: - for md5, tnum in zip(md5_list, token_num_list): + for md5sum, token_num in zip(md5sum_list, token_num_list): t = time.time() - if md5 not in self._md5_to_record: - # 若不存在则分配新记录(与alloc逻辑相同) + if md5sum not in self._md5_to_record: if self.occupied >= self.capacity: self._clear() if self.occupied >= self.capacity: results.append(None) continue - new_id = uuid.uuid1().int - self._check_and_set_new_id_range(tnum) + id = uuid.uuid1() + id = id.int + self._check_and_set_new_id_range(token_num) record = Record( - id=new_id, - md5sum=md5, + id=id, + md5sum=md5sum, ref=1, data=False, embed=False, createtime=t, visittime=t, token_id=self.token_id_range_start, - token_num=tnum, + token_num=token_num, ) - self.token_id_range_start += tnum - self._records[new_id] = record - self._md5_to_record[md5] = record + self.token_id_range_start += token_num + self._records[id] = record + self._md5_to_record[md5sum] = record self.occupied += 1 + # cache hit else: - # 缓存命中,更新引用计数和访问时间 - record = self._md5_to_record[md5] + record = self._md5_to_record[md5sum] record.visittime = t record.ref += 1 results.append({"id": record.id, "token_id": record.token_id, "token_num": record.token_num}) return results - def release(self, id: int) -> None: + def release(self, ids: list[int]) -> None: with self.lock: - self._records[id].ref -= 1 - - # def set_item_data(self, id: int) -> None: - # self._records[id].data = True + for id in ids: + self._records[id].ref -= 1 - # def get_item_data(self, id: int) -> bool: - # return self._records[id].data + def set_items_data(self, ids: list[int]) -> None: + for id in ids: + self._records[id].data = True def get_items_data(self, ids: list[int]) -> list[bool]: - with self.lock: - return [self._records.get(i).data if i in self._records else False for i in ids] - - def set_items_data(self, ids: list[int]) -> None: - with self.lock: - for i in ids: - self._records[i].data = True + return [self._records.get(i).data if i in self._records else False for i in ids] - def set_item_embed(self, id: int) -> None: - self._records[id].embed = True + def set_items_embed(self, ids: list[int]) -> None: + for id in ids: + self._records[id].embed = True - def get_item_embed(self, id: int) -> bool: - return self._records[id].embed + def get_items_embed(self, ids: list[int]) -> list[bool]: + return [self._records.get(i).embed if i in self._records else False for i in ids] diff --git a/lightllm/server/embed_cache/interface.py b/lightllm/server/embed_cache/interface.py deleted file mode 100644 index 8553e83df..000000000 --- a/lightllm/server/embed_cache/interface.py +++ /dev/null @@ -1,49 +0,0 @@ -from typing import Union - - -class CacheManager(object): - """Defines the interface of embedding cache manager.""" - - def __init__(self) -> None: - pass - - def alloc_batch(self, md5sum_list: list[str], token_num_list: list[int]) -> list[dict]: - pass - - def release(self, id: int) -> None: - pass - - def set_items_data(self, ids: list[int]) -> None: - pass - - def get_items_data(self, ids: list[int]) -> list[bool]: - pass - - def set_item_embed(self, id: int) -> None: - pass - - def get_item_embed(self, id: int) -> bool: - pass - - -class CacheManagerFactory(object): - _impls = dict() - - @classmethod - def register(cls, target): - def add_register_item(key, value): - if not callable(value): - raise Exception(f"register object must be callable! But receice:{value} is not callable!") - if key in cls._impls: - print(f"warning: \033[33m{value.__name__} has been registered before, so we will overriden it\033[0m") - cls._impls[key] = value - return value - - if callable(target): # 如果传入的目标可调用,说明之前没有给出注册名字,我们就以传入的函数或者类的名字作为注册名 - return add_register_item(target.__name__, target) - else: # 如果不可调用,说明额外说明了注册的可调用对象的名字 - return lambda x: add_register_item(target, x) - - @classmethod - def get_impl(cls, name: str): - return cls._impls[name] diff --git a/lightllm/server/embed_cache/manager.py b/lightllm/server/embed_cache/manager.py index 314bcec4d..fe1d0a441 100644 --- a/lightllm/server/embed_cache/manager.py +++ b/lightllm/server/embed_cache/manager.py @@ -3,12 +3,12 @@ import inspect from typing import Union from lightllm.utils.graceful_utils import graceful_registry -from .interface import CacheManager +from lightllm.server.embed_cache.impl.naive_memory_cache import InMemoryCache from rpyc.utils.classic import obtain class CacheServer(rpyc.Service): - def __init__(self, manager_impl: CacheManager) -> None: + def __init__(self, manager_impl: InMemoryCache) -> None: super().__init__() self._impl = manager_impl @@ -22,15 +22,15 @@ def on_disconnect(self, conn): # (to finalize the service, if needed) pass - def exposed_alloc_batch(self, md5sum_list: list[str], token_num_list: list[int]) -> dict: + def exposed_alloc(self, md5sum_list: list[str], token_num_list: list[int]) -> dict: md5sum_list = obtain(md5sum_list) token_num_list = obtain(token_num_list) record = self._impl.alloc(md5sum_list, token_num_list) return record - def exposed_release(self, id: int) -> None: - id = obtain(id) - return self._impl.release(id) + def exposed_release(self, ids: list[int]) -> None: + ids = obtain(ids) + return self._impl.release(ids) def exposed_set_items_data(self, ids: list[int]) -> None: ids = obtain(ids) @@ -40,23 +40,20 @@ def exposed_get_items_data(self, ids: list[int]) -> list[bool]: ids = obtain(ids) return self._impl.get_items_data(ids=ids) - def exposed_set_item_embed(self, id: int) -> None: - id = obtain(id) - return self._impl.set_item_embed(id=id) + def exposed_set_items_embed(self, ids: list[int]) -> None: + ids = obtain(ids) + return self._impl.set_items_embed(ids=ids) - def exposed_get_item_embed(self, id: int) -> bool: - id = obtain(id) - return self._impl.get_item_embed(id=id) + def exposed_get_items_embed(self, ids: list[int]) -> list[bool]: + ids = obtain(ids) + return self._impl.get_items_embed(ids=ids) def start_cache_manager(port: int, args, pipe_writer): # 注册graceful 退出的处理 graceful_registry(inspect.currentframe().f_code.co_name) - from .interface import CacheManagerFactory - - manager_cls = CacheManagerFactory.get_impl("naive") - manager = manager_cls(args) + manager = InMemoryCache(args) service = CacheServer(manager) from rpyc.utils.server import ThreadedServer diff --git a/lightllm/server/httpserver/manager.py b/lightllm/server/httpserver/manager.py index 020bfdd0b..ea27af5ad 100644 --- a/lightllm/server/httpserver/manager.py +++ b/lightllm/server/httpserver/manager.py @@ -81,7 +81,7 @@ def __init__( self.enable_multimodal = enable_multimodal if self.enable_multimodal: - self.cache_client = rpyc.connect("localhost", cache_port, onfig={"allow_pickle": True}) + self.cache_client = rpyc.connect("localhost", cache_port, config={"allow_pickle": True}) self.send_to_visual = context.socket(zmq.PUSH) self.send_to_visual.connect(f"{args.zmq_mode}127.0.0.1:{visual_port}") @@ -113,33 +113,53 @@ def __init__( self.latest_success_infer_time_mark.set_value(int(time.time())) return - # connect cache server, calculate md5, alloc resource, return uuid - async def _alloc_resource(self, item: Union[ImageItem, AudioItem]): - if isinstance(item, ImageItem): - data = item.read() - # must after init_imageitem_extral_params - num_tokens = self.tokenizer.get_image_token_length(item) - elif isinstance(item, AudioItem): - data = item.read() - num_tokens = self.tokenizer.get_audio_token_length(item) - else: - raise ValueError(f"unexpected item type {type(item)}") - - md5sum = hashlib.md5(data).hexdigest() + "_" + str(hash(frozendict(item.extra_params))) + async def _alloc_resource(self, items, md5sums, tokens_nums, datas): wait_time = 1 - while True: - record = self.cache_client.root.alloc(md5sum, num_tokens) - # hit or new - if record: - uid = record["id"] - if not self.cache_client.root.get_item_data(uid): - create_shm(get_shm_name_data(uid), data) - self.cache_client.root.set_item_data(uid) - return record - # cache full - else: + pending_idx = list(range(len(items))) + while pending_idx: + sub_md5sum = [md5sums[i] for i in pending_idx] + sub_tokens_num = [tokens_nums[i] for i in pending_idx] + + records = self.cache_client.root.alloc(sub_md5sum, sub_tokens_num) + + if all(record is None for record in records): await asyncio.sleep(wait_time) wait_time = min(wait_time + 2, 9) + continue + + next_pending = [] # record为None,安排在下一轮 + uids_to_check = [] # record存在,本轮处理 + uid_to_idx = {} # uid → 原items下标 + + for local_pos, record in enumerate(records): + global_pos = pending_idx[local_pos] + + if record is None: + next_pending.append(global_pos) + continue + + uid = record["id"] + uid_to_idx[uid] = global_pos + uids_to_check.append(uid) + + item = items[global_pos] + item.uuid = uid + item.token_id = record["token_id"] + item.token_num = record["token_num"] + + if uids_to_check: + ready_flags = self.cache_client.root.get_items_data(uids_to_check) + need_write = [] + + for uid, ready in zip(uids_to_check, ready_flags): + if not ready: + idx = uid_to_idx[uid] + create_shm(get_shm_name_data(uid), datas[idx]) + need_write.append(uid) + if need_write: + self.cache_client.root.set_items_data(need_write) + pending_idx = next_pending + return async def _alloc_multimodal_resources(self, multimodal_params: MultimodalParams, sampling_params: SamplingParams): # 只有 P 和 NORMAL 节点需要真的管理多模态资源 @@ -148,74 +168,51 @@ async def _alloc_multimodal_resources(self, multimodal_params: MultimodalParams, # 如果不加任何锁,假如请求1和请求2都有6张图片,而cache_capacity为10, # 那么如果某一时刻shm中存在请求1的5张图和请求2的5张图,将会资源竞争产生死锁。 async with self._resource_lock: - items, md5s, token_nums, datas = [], [], [], [] + items, md5sums, tokens_nums, datas = [], [], [], [] for img in multimodal_params.images: self.tokenizer.init_imageitem_extral_params(img, multimodal_params, sampling_params) data = img.read() # must after init_imageitem_extral_params - num_tokens = self.tokenizer.get_image_token_length(img) - md5 = hashlib.md5(data).hexdigest() + "_" + str(hash(frozendict(img.extra_params))) - md5s.append(md5) - token_nums.append(num_tokens) + tokens_num = self.tokenizer.get_image_token_length(img) + md5sum = hashlib.md5(data).hexdigest() + "_" + str(hash(frozendict(img.extra_params))) + md5sums.append(md5sum) + tokens_nums.append(tokens_num) datas.append(data) items.append(img) - # img.uuid = record["id"] - # img.token_id = record["token_id"] - # img.token_num = record["token_num"] for audio in multimodal_params.audios: self.tokenizer.init_audioitem_extral_params(audio, multimodal_params, sampling_params) data = audio.read() - num_tokens = self.tokenizer.get_audio_token_length(audio) - md5 = hashlib.md5(data).hexdigest() + "_" + str(hash(frozendict(audio.extra_params))) - md5s.append(md5) - token_nums.append(num_tokens) + tokens_num = self.tokenizer.get_audio_token_length(audio) + md5sum = hashlib.md5(data).hexdigest() + "_" + str(hash(frozendict(audio.extra_params))) + md5sums.append(md5sum) + tokens_nums.append(tokens_num) datas.append(data) items.append(audio) - # audio.uuid = record["id"] - # audio.token_id = record["token_id"] - # audio.token_num = record["token_num"] - wait_time = 1 - while True: - records = self.cache_client.root.alloc_batch(md5s, token_nums) - if all(r is not None for r in records): - # hit or new - break - # cache full - await asyncio.sleep(wait_time) - wait_time = min(wait_time + 2, 9) - uids = [record["id"] for record in records] - data_ready = self.cache_client.root.get_items_data(uids) - uids_to_write = [] - for item, record, data, ready in zip(items, records, datas, data_ready): - item.uuid = record["id"] - item.token_id = record["token_id"] - item.token_num = record["token_num"] - if not ready: - create_shm(get_shm_name_data(item.uuid), data) - self.cache_client.root.set_items_data(item.uuid) - uids_to_write.append(item.uuid) - if uids_to_write: - self.cache_client.root.set_items_data(uids_to_write) + await self._alloc_resource(items, md5sums, tokens_nums, datas) + return async def _release_multimodal_resources(self, multimodal_params: MultimodalParams): # 只有 P 和 NORMAL 节点需要真的管理多模态资源 if self.pd_mode.is_P_or_NORMAL(): if multimodal_params is not None: + ids_to_release = [] for img in multimodal_params.images: if img.uuid is not None: - self.cache_client.root.release(img.uuid) + ids_to_release.append(img.uuid) # 将 uuid 等 赋值为 None, 防止因为abort等异常情况造成重复释放异常 img.uuid = None img.token_id = None img.token_num = None for audio in multimodal_params.audios: if audio.uuid is not None: - self.cache_client.root.release(audio.uuid) + ids_to_release.append(audio.uuid) # 将 uuid 等 赋值为 None, 防止因为abort等异常情况造成重复释放异常 audio.uuid = None audio.token_id = None audio.token_num = None + if ids_to_release: + self.cache_client.root.release(ids_to_release) return def tokens(self, prompt, multimodal_params, samping_params: SamplingParams, kwargs=None): diff --git a/lightllm/server/visualserver/manager.py b/lightllm/server/visualserver/manager.py index f16b0bd66..5d97074e7 100644 --- a/lightllm/server/visualserver/manager.py +++ b/lightllm/server/visualserver/manager.py @@ -35,7 +35,7 @@ def __init__( self.recv_from_httpserver = context.socket(zmq.PULL) self.recv_from_httpserver.bind(f"{args.zmq_mode}127.0.0.1:{visual_port}") - self.cache_client = rpyc.connect("localhost", cache_port) + self.cache_client = rpyc.connect("localhost", cache_port, config={"allow_pickle": True}) self.cache_port = cache_port self.waiting_reqs: List[GroupReqIndexes] = [] self.model_weightdir = args.model_dir @@ -121,11 +121,9 @@ async def loop_for_fwd(self): multimodal_params = group_req_indexes.multimodal_params img_uuids = [img.uuid for img in multimodal_params.images] - ready_flags = [] - for uuid in img_uuids: - ready_flags.append(self.cache_client.root.get_items_embed(uuid)) + ready_image = self.cache_client.root.get_items_embed(img_uuids) - for img, ready in zip(multimodal_params.images, ready_flags): + for img, ready in zip(multimodal_params.images, ready_image): if not ready: images_need_infer.append(img) diff --git a/lightllm/server/visualserver/model_infer/model_rpc.py b/lightllm/server/visualserver/model_infer/model_rpc.py index a275a4002..2ca384886 100644 --- a/lightllm/server/visualserver/model_infer/model_rpc.py +++ b/lightllm/server/visualserver/model_infer/model_rpc.py @@ -94,14 +94,20 @@ def exposed_encode(self, images: List[ImageItem]): images = obtain(images) all_img_embeds, uuids, valid_ids = self.forward(images) all_img_embeds = all_img_embeds.to(torch.device("cpu")) + if self.tp_rank_id == 0: - for i in range(len(uuids)): + ready_flags = self.cache_client.root.get_items_embed(uuids) + ids_to_set = [] + for i, ready in enumerate(ready_flags): + if ready: + continue uid = uuids[i] - if not self.cache_client.root.get_item_embed(uid): - start, end = valid_ids[i] - cur_embed_bytes = tensor2bytes(all_img_embeds[start:end]) - create_shm(get_shm_name_embed(uuids[i]), cur_embed_bytes) - self.cache_client.root.set_item_embed(uuids[i]) + start, end = valid_ids[i] + cur_embed_bytes = tensor2bytes(all_img_embeds[start:end]) + create_shm(get_shm_name_embed(uid), cur_embed_bytes) + ids_to_set.append(uid) + if ids_to_set: + self.cache_client.root.set_items_embed(ids_to_set) return From ec0e0680d7c7f8aac00fcda0bb6839080f4e0171 Mon Sep 17 00:00:00 2001 From: niushengxiao Date: Tue, 15 Jul 2025 10:57:55 +0000 Subject: [PATCH 3/8] [fix]fix image rpyc --- lightllm/models/whisper/whisper_audio.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lightllm/models/whisper/whisper_audio.py b/lightllm/models/whisper/whisper_audio.py index bc36a9d21..7eb37f463 100644 --- a/lightllm/models/whisper/whisper_audio.py +++ b/lightllm/models/whisper/whisper_audio.py @@ -193,11 +193,10 @@ def encode(self, audio_items: List[AudioItem]): ready_audio = self.cache_client.root.get_items_data(uuids) ids_to_set = [] for i, ready in enumerate(ready_audio): - if ready: - continue - uid = uuids[i] - cur_embed_bytes = tensor2bytes(audios[i][: audio_token_num[i]]) - create_shm(get_shm_name_data(uid), cur_embed_bytes) - ids_to_set.append(uid) + if not ready: + uid = uuids[i] + cur_embed_bytes = tensor2bytes(audios[i][: audio_token_num[i]]) + create_shm(get_shm_name_embed(uid), cur_embed_bytes) + ids_to_set.append(uid) if ids_to_set: self.cache_client.root.set_items_data(ids=ids_to_set) From e12d4a32902b45e4be632a084dc6a81830e6c6bd Mon Sep 17 00:00:00 2001 From: niushengxiao Date: Wed, 16 Jul 2025 06:21:01 +0000 Subject: [PATCH 4/8] [fix]batch rpyc call in multimodal --- lightllm/models/whisper/whisper_audio.py | 2 +- .../embed_cache/impl/naive_memory_cache.py | 53 +++++++++-------- lightllm/server/embed_cache/manager.py | 12 ++-- lightllm/server/httpserver/manager.py | 59 +++++++------------ 4 files changed, 54 insertions(+), 72 deletions(-) diff --git a/lightllm/models/whisper/whisper_audio.py b/lightllm/models/whisper/whisper_audio.py index 7eb37f463..55a870e0e 100644 --- a/lightllm/models/whisper/whisper_audio.py +++ b/lightllm/models/whisper/whisper_audio.py @@ -190,7 +190,7 @@ def encode(self, audio_items: List[AudioItem]): audio_lens_after_cnn = np.array(audio_lens_after_cnn, dtype=np.int32) audio_token_num = (audio_lens_after_cnn - 2) // 2 + 1 - ready_audio = self.cache_client.root.get_items_data(uuids) + ready_audio = self.cache_client.root.get_items_embed(uuids) ids_to_set = [] for i, ready in enumerate(ready_audio): if not ready: diff --git a/lightllm/server/embed_cache/impl/naive_memory_cache.py b/lightllm/server/embed_cache/impl/naive_memory_cache.py index 0ba701245..5fbfebf39 100644 --- a/lightllm/server/embed_cache/impl/naive_memory_cache.py +++ b/lightllm/server/embed_cache/impl/naive_memory_cache.py @@ -2,7 +2,7 @@ import threading import dataclasses import requests -from typing import Union +from typing import Union, Optional import torch import time from collections import deque @@ -87,41 +87,42 @@ def _clear(self): if deleted >= max_delete: break - def alloc(self, md5sum_list: list[str], token_num_list: list[int]) -> list[dict]: - results = [] + def alloc(self, md5sum_list: list[str], token_num_list: list[int]) -> Optional[list[dict]]: + now = time.time() with self.lock: + new_md5s = [m for m in md5sum_list if m not in self._md5_to_record] + new_needed = len(new_md5s) + + if self.occupied + new_needed > self.capacity: + self._clear() + if self.occupied + new_needed > self.capacity: + return None + + results: list[dict] = [] for md5sum, token_num in zip(md5sum_list, token_num_list): - t = time.time() - if md5sum not in self._md5_to_record: - if self.occupied >= self.capacity: - self._clear() - if self.occupied >= self.capacity: - results.append(None) - continue - id = uuid.uuid1() - id = id.int + if md5sum in self._md5_to_record: + rec = self._md5_to_record[md5sum] + rec.visittime = now + rec.ref += 1 + else: + uid_int = uuid.uuid1().int self._check_and_set_new_id_range(token_num) - record = Record( - id=id, + rec = Record( + id=uid_int, md5sum=md5sum, ref=1, data=False, embed=False, - createtime=t, - visittime=t, + createtime=now, + visittime=now, token_id=self.token_id_range_start, token_num=token_num, ) self.token_id_range_start += token_num - self._records[id] = record - self._md5_to_record[md5sum] = record + self._records[uid_int] = rec + self._md5_to_record[md5sum] = rec self.occupied += 1 - # cache hit - else: - record = self._md5_to_record[md5sum] - record.visittime = t - record.ref += 1 - results.append({"id": record.id, "token_id": record.token_id, "token_num": record.token_num}) + results.append({"id": rec.id, "token_id": rec.token_id, "token_num": rec.token_num}) return results def release(self, ids: list[int]) -> None: @@ -133,12 +134,12 @@ def set_items_data(self, ids: list[int]) -> None: for id in ids: self._records[id].data = True - def get_items_data(self, ids: list[int]) -> list[bool]: + def get_items_data(self, ids: list[int]) -> list[Optional[bool]]: return [self._records.get(i).data if i in self._records else False for i in ids] def set_items_embed(self, ids: list[int]) -> None: for id in ids: self._records[id].embed = True - def get_items_embed(self, ids: list[int]) -> list[bool]: + def get_items_embed(self, ids: list[int]) -> list[Optional[bool]]: return [self._records.get(i).embed if i in self._records else False for i in ids] diff --git a/lightllm/server/embed_cache/manager.py b/lightllm/server/embed_cache/manager.py index fe1d0a441..566124142 100644 --- a/lightllm/server/embed_cache/manager.py +++ b/lightllm/server/embed_cache/manager.py @@ -1,7 +1,7 @@ import rpyc import uuid import inspect -from typing import Union +from typing import Union, Optional from lightllm.utils.graceful_utils import graceful_registry from lightllm.server.embed_cache.impl.naive_memory_cache import InMemoryCache from rpyc.utils.classic import obtain @@ -22,7 +22,7 @@ def on_disconnect(self, conn): # (to finalize the service, if needed) pass - def exposed_alloc(self, md5sum_list: list[str], token_num_list: list[int]) -> dict: + def exposed_alloc(self, md5sum_list: list[str], token_num_list: list[int]) -> Optional[list[dict]]: md5sum_list = obtain(md5sum_list) token_num_list = obtain(token_num_list) record = self._impl.alloc(md5sum_list, token_num_list) @@ -34,19 +34,19 @@ def exposed_release(self, ids: list[int]) -> None: def exposed_set_items_data(self, ids: list[int]) -> None: ids = obtain(ids) - return self._impl.set_items_data(ids=ids) + return self._impl.set_items_data(ids) def exposed_get_items_data(self, ids: list[int]) -> list[bool]: ids = obtain(ids) - return self._impl.get_items_data(ids=ids) + return self._impl.get_items_data(ids) def exposed_set_items_embed(self, ids: list[int]) -> None: ids = obtain(ids) - return self._impl.set_items_embed(ids=ids) + return self._impl.set_items_embed(ids) def exposed_get_items_embed(self, ids: list[int]) -> list[bool]: ids = obtain(ids) - return self._impl.get_items_embed(ids=ids) + return self._impl.get_items_embed(ids) def start_cache_manager(port: int, args, pipe_writer): diff --git a/lightllm/server/httpserver/manager.py b/lightllm/server/httpserver/manager.py index ea27af5ad..80a48d677 100644 --- a/lightllm/server/httpserver/manager.py +++ b/lightllm/server/httpserver/manager.py @@ -113,53 +113,34 @@ def __init__( self.latest_success_infer_time_mark.set_value(int(time.time())) return - async def _alloc_resource(self, items, md5sums, tokens_nums, datas): + async def _alloc_resource(self, items, md5sums, token_nums, datas): wait_time = 1 - pending_idx = list(range(len(items))) - while pending_idx: - sub_md5sum = [md5sums[i] for i in pending_idx] - sub_tokens_num = [tokens_nums[i] for i in pending_idx] - - records = self.cache_client.root.alloc(sub_md5sum, sub_tokens_num) + while True: + records = self.cache_client.root.alloc(md5sums, token_nums) - if all(record is None for record in records): + if records is None: await asyncio.sleep(wait_time) - wait_time = min(wait_time + 2, 9) + wait_time = min(wait_time + 0.5, 2) continue - next_pending = [] # record为None,安排在下一轮 - uids_to_check = [] # record存在,本轮处理 - uid_to_idx = {} # uid → 原items下标 + uid_list = [] + for item, rec in zip(items, records): + item.uuid = rec["id"] + item.token_id = rec["token_id"] + item.token_num = rec["token_num"] + uid_list.append(rec["id"]) - for local_pos, record in enumerate(records): - global_pos = pending_idx[local_pos] + ready_flags = self.cache_client.root.get_items_data(uid_list) + need_write = [] - if record is None: - next_pending.append(global_pos) - continue + for uid, ready, data in zip(uid_list, ready_flags, datas): + if not ready: + create_shm(get_shm_name_data(uid), data) + need_write.append(uid) - uid = record["id"] - uid_to_idx[uid] = global_pos - uids_to_check.append(uid) - - item = items[global_pos] - item.uuid = uid - item.token_id = record["token_id"] - item.token_num = record["token_num"] - - if uids_to_check: - ready_flags = self.cache_client.root.get_items_data(uids_to_check) - need_write = [] - - for uid, ready in zip(uids_to_check, ready_flags): - if not ready: - idx = uid_to_idx[uid] - create_shm(get_shm_name_data(uid), datas[idx]) - need_write.append(uid) - if need_write: - self.cache_client.root.set_items_data(need_write) - pending_idx = next_pending - return + if need_write: + self.cache_client.root.set_items_data(need_write) + return async def _alloc_multimodal_resources(self, multimodal_params: MultimodalParams, sampling_params: SamplingParams): # 只有 P 和 NORMAL 节点需要真的管理多模态资源 From 99939689e5852d3df9abf8e825c3b84605e9dc8b Mon Sep 17 00:00:00 2001 From: wangzaijun Date: Tue, 22 Jul 2025 05:08:32 +0000 Subject: [PATCH 5/8] fix --- lightllm/models/whisper/whisper_audio.py | 4 ++-- lightllm/server/audioserver/manager.py | 3 ++- lightllm/server/httpserver/manager.py | 13 +++++++------ lightllm/server/visualserver/manager.py | 3 ++- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/lightllm/models/whisper/whisper_audio.py b/lightllm/models/whisper/whisper_audio.py index 55a870e0e..a89bfe8fc 100644 --- a/lightllm/models/whisper/whisper_audio.py +++ b/lightllm/models/whisper/whisper_audio.py @@ -11,7 +11,7 @@ from transformers.processing_utils import ProcessorMixin from lightllm.server.embed_cache.utils import tensor2bytes, read_shm, create_shm, get_shm_name_data, get_shm_name_embed from lightllm.server.multimodal_params import AudioItem - +from rpyc.utils.classic import obtain # tokenizer_class removed class WhisperProcessor(ProcessorMixin): @@ -190,7 +190,7 @@ def encode(self, audio_items: List[AudioItem]): audio_lens_after_cnn = np.array(audio_lens_after_cnn, dtype=np.int32) audio_token_num = (audio_lens_after_cnn - 2) // 2 + 1 - ready_audio = self.cache_client.root.get_items_embed(uuids) + ready_audio = obtain(self.cache_client.root.get_items_embed(uuids)) ids_to_set = [] for i, ready in enumerate(ready_audio): if not ready: diff --git a/lightllm/server/audioserver/manager.py b/lightllm/server/audioserver/manager.py index a403f5ae7..707fd11d0 100644 --- a/lightllm/server/audioserver/manager.py +++ b/lightllm/server/audioserver/manager.py @@ -14,6 +14,7 @@ from lightllm.server.multimodal_params import AudioItem from .model_infer.model_rpc import start_model_process, AudioModelRpcClient from lightllm.utils.graceful_utils import graceful_registry +from rpyc.utils.classic import obtain logger = init_logger(__name__) @@ -95,7 +96,7 @@ async def loop_for_fwd(self): multimodal_params = group_req_indexes.multimodal_params audio_uuids = [audio.uuid for audio in multimodal_params.audios] - ready_audio = self.cache_client.root.get_items_embed(audio_uuids) + ready_audio = obtain(self.cache_client.root.get_items_embed(audio_uuids)) for audio, ready in zip(multimodal_params.audios, ready_audio): if not ready: diff --git a/lightllm/server/httpserver/manager.py b/lightllm/server/httpserver/manager.py index 80a48d677..8d24a8355 100644 --- a/lightllm/server/httpserver/manager.py +++ b/lightllm/server/httpserver/manager.py @@ -32,6 +32,7 @@ from lightllm.utils.statics_utils import MovingAverage from lightllm.utils.config_utils import get_vocab_size from lightllm.utils.envs_utils import get_unique_server_name +from rpyc.utils.classic import obtain logger = init_logger(__name__) @@ -116,7 +117,7 @@ def __init__( async def _alloc_resource(self, items, md5sums, token_nums, datas): wait_time = 1 while True: - records = self.cache_client.root.alloc(md5sums, token_nums) + records = obtain(self.cache_client.root.alloc(md5sums, token_nums)) if records is None: await asyncio.sleep(wait_time) @@ -130,16 +131,16 @@ async def _alloc_resource(self, items, md5sums, token_nums, datas): item.token_num = rec["token_num"] uid_list.append(rec["id"]) - ready_flags = self.cache_client.root.get_items_data(uid_list) - need_write = [] + ready_flags = obtain(self.cache_client.root.get_items_data(uid_list)) + update_data_ids = [] for uid, ready, data in zip(uid_list, ready_flags, datas): if not ready: create_shm(get_shm_name_data(uid), data) - need_write.append(uid) + update_data_ids.append(uid) - if need_write: - self.cache_client.root.set_items_data(need_write) + if update_data_ids: + self.cache_client.root.set_items_data(update_data_ids) return async def _alloc_multimodal_resources(self, multimodal_params: MultimodalParams, sampling_params: SamplingParams): diff --git a/lightllm/server/visualserver/manager.py b/lightllm/server/visualserver/manager.py index 5d97074e7..7a4557c47 100644 --- a/lightllm/server/visualserver/manager.py +++ b/lightllm/server/visualserver/manager.py @@ -15,6 +15,7 @@ from lightllm.utils.log_utils import init_logger from lightllm.utils.graceful_utils import graceful_registry from lightllm.utils.process_check import start_parent_check_thread +from rpyc.utils.classic import obtain logger = init_logger(__name__) @@ -121,7 +122,7 @@ async def loop_for_fwd(self): multimodal_params = group_req_indexes.multimodal_params img_uuids = [img.uuid for img in multimodal_params.images] - ready_image = self.cache_client.root.get_items_embed(img_uuids) + ready_image = obtain(self.cache_client.root.get_items_embed(img_uuids)) for img, ready in zip(multimodal_params.images, ready_image): if not ready: From 2f710fd6ed94b1fb60d1c007e0719503ac515536 Mon Sep 17 00:00:00 2001 From: wangzaijun Date: Tue, 22 Jul 2025 05:19:15 +0000 Subject: [PATCH 6/8] fix --- lightllm/server/visualserver/model_infer/model_rpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightllm/server/visualserver/model_infer/model_rpc.py b/lightllm/server/visualserver/model_infer/model_rpc.py index 2ca384886..d2d45f2fd 100644 --- a/lightllm/server/visualserver/model_infer/model_rpc.py +++ b/lightllm/server/visualserver/model_infer/model_rpc.py @@ -96,7 +96,7 @@ def exposed_encode(self, images: List[ImageItem]): all_img_embeds = all_img_embeds.to(torch.device("cpu")) if self.tp_rank_id == 0: - ready_flags = self.cache_client.root.get_items_embed(uuids) + ready_flags = obtain(self.cache_client.root.get_items_embed(uuids)) ids_to_set = [] for i, ready in enumerate(ready_flags): if ready: From 49a5a15857de70dadb248f634923e7b666e44998 Mon Sep 17 00:00:00 2001 From: hiworldwzj <30762946+hiworldwzj@users.noreply.github.com> Date: Tue, 22 Jul 2025 13:21:39 +0800 Subject: [PATCH 7/8] fix --- lightllm/models/whisper/whisper_audio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightllm/models/whisper/whisper_audio.py b/lightllm/models/whisper/whisper_audio.py index a89bfe8fc..c5959ea1e 100644 --- a/lightllm/models/whisper/whisper_audio.py +++ b/lightllm/models/whisper/whisper_audio.py @@ -199,4 +199,4 @@ def encode(self, audio_items: List[AudioItem]): create_shm(get_shm_name_embed(uid), cur_embed_bytes) ids_to_set.append(uid) if ids_to_set: - self.cache_client.root.set_items_data(ids=ids_to_set) + self.cache_client.root.set_items_embed(ids=ids_to_set) From a34fd116e53382ab273f65a64069dabfb9f19767 Mon Sep 17 00:00:00 2001 From: wangzaijun Date: Tue, 22 Jul 2025 05:42:27 +0000 Subject: [PATCH 8/8] fix --- .../CN/source/tutorial/api_server_args_zh.rst | 4 --- .../EN/source/tutorial/api_server_args_zh.rst | 4 --- lightllm/server/api_cli.py | 3 --- lightllm/server/core/objs/start_args_type.py | 1 - .../embed_cache/impl/naive_memory_cache.py | 26 +++++++++---------- lightllm/server/httpserver/manager.py | 13 +++++----- 6 files changed, 18 insertions(+), 33 deletions(-) diff --git a/docs/CN/source/tutorial/api_server_args_zh.rst b/docs/CN/source/tutorial/api_server_args_zh.rst index d7c055ef4..f40528bba 100755 --- a/docs/CN/source/tutorial/api_server_args_zh.rst +++ b/docs/CN/source/tutorial/api_server_args_zh.rst @@ -274,10 +274,6 @@ attention类型选择参数 多模态资源的缓存服务器容量,默认为 ``200`` -.. option:: --cache_reserved_ratio - - 缓存服务器清理后的保留容量比例,默认为 ``0.5`` - .. option:: --visual_infer_batch_size 每次推理批次中处理的图像数量,默认为 ``1`` diff --git a/docs/EN/source/tutorial/api_server_args_zh.rst b/docs/EN/source/tutorial/api_server_args_zh.rst index 3b25ae85c..629d34bf8 100755 --- a/docs/EN/source/tutorial/api_server_args_zh.rst +++ b/docs/EN/source/tutorial/api_server_args_zh.rst @@ -273,10 +273,6 @@ Multimodal Parameters Cache server capacity for multimodal resources, default is ``200`` -.. option:: --cache_reserved_ratio - - Reserved capacity ratio after cache server cleanup, default is ``0.5`` - .. option:: --visual_infer_batch_size Number of images processed in each inference batch, default is ``1`` diff --git a/lightllm/server/api_cli.py b/lightllm/server/api_cli.py index c0ccd7a1c..ee5518ea8 100644 --- a/lightllm/server/api_cli.py +++ b/lightllm/server/api_cli.py @@ -288,9 +288,6 @@ def make_argument_parser() -> argparse.ArgumentParser: parser.add_argument( "--cache_capacity", type=int, default=200, help="cache server capacity for multimodal resources" ) - parser.add_argument( - "--cache_reserved_ratio", type=float, default=0.5, help="cache server reserved capacity ratio after clear" - ) parser.add_argument( "--data_type", type=str, diff --git a/lightllm/server/core/objs/start_args_type.py b/lightllm/server/core/objs/start_args_type.py index 15e344871..d4a205a15 100644 --- a/lightllm/server/core/objs/start_args_type.py +++ b/lightllm/server/core/objs/start_args_type.py @@ -57,7 +57,6 @@ class StartArgs: enable_decode_microbatch_overlap: bool = field(default=False) enable_prefill_microbatch_overlap: bool = field(default=False) cache_capacity: int = field(default=200) - cache_reserved_ratio: float = field(default=0.5) data_type: Optional[str] = field( default=None, metadata={"choices": ["fp16", "float16", "bf16", "bfloat16", "fp32", "float32"]} ) diff --git a/lightllm/server/embed_cache/impl/naive_memory_cache.py b/lightllm/server/embed_cache/impl/naive_memory_cache.py index 5fbfebf39..5477be22b 100644 --- a/lightllm/server/embed_cache/impl/naive_memory_cache.py +++ b/lightllm/server/embed_cache/impl/naive_memory_cache.py @@ -32,8 +32,6 @@ def __init__(self, args) -> None: self._records = dict() self._md5_to_record = dict() self.capacity = max(1, args.cache_capacity) - self.reserved = max(0, int(self.capacity * args.cache_reserved_ratio)) - self.reserved = min(self.reserved, self.capacity - 1) self.occupied = 0 self.expired_secs = 60 * 60 self.lock = threading.Lock() @@ -69,9 +67,9 @@ def _check_and_set_new_id_range(self, alloced_token_num): time.sleep(3) return - def _clear(self): + def _clear(self, free_max_count: int): deleted = 0 - max_delete = max(1, self.occupied - self.reserved) + max_delete = free_max_count items = sorted(self._records.items(), key=lambda x: x[1].visittime) t = time.time() for id, record in items: @@ -91,10 +89,10 @@ def alloc(self, md5sum_list: list[str], token_num_list: list[int]) -> Optional[l now = time.time() with self.lock: new_md5s = [m for m in md5sum_list if m not in self._md5_to_record] - new_needed = len(new_md5s) + new_needed = len(set(new_md5s)) if self.occupied + new_needed > self.capacity: - self._clear() + self._clear(free_max_count=new_needed - (self.capacity - self.occupied)) if self.occupied + new_needed > self.capacity: return None @@ -127,19 +125,19 @@ def alloc(self, md5sum_list: list[str], token_num_list: list[int]) -> Optional[l def release(self, ids: list[int]) -> None: with self.lock: - for id in ids: - self._records[id].ref -= 1 + for id_ in ids: + self._records[id_].ref -= 1 def set_items_data(self, ids: list[int]) -> None: - for id in ids: - self._records[id].data = True + for id_ in ids: + self._records[id_].data = True def get_items_data(self, ids: list[int]) -> list[Optional[bool]]: - return [self._records.get(i).data if i in self._records else False for i in ids] + return [self._records.get(id_).data if id_ in self._records else False for id_ in ids] def set_items_embed(self, ids: list[int]) -> None: - for id in ids: - self._records[id].embed = True + for id_ in ids: + self._records[id_].embed = True def get_items_embed(self, ids: list[int]) -> list[Optional[bool]]: - return [self._records.get(i).embed if i in self._records else False for i in ids] + return [self._records.get(id_).embed if id_ in self._records else False for id_ in ids] diff --git a/lightllm/server/httpserver/manager.py b/lightllm/server/httpserver/manager.py index 8d24a8355..96e48fb13 100644 --- a/lightllm/server/httpserver/manager.py +++ b/lightllm/server/httpserver/manager.py @@ -115,13 +115,12 @@ def __init__( return async def _alloc_resource(self, items, md5sums, token_nums, datas): - wait_time = 1 + while True: records = obtain(self.cache_client.root.alloc(md5sums, token_nums)) if records is None: - await asyncio.sleep(wait_time) - wait_time = min(wait_time + 0.5, 2) + await asyncio.sleep(0.1) continue uid_list = [] @@ -155,19 +154,19 @@ async def _alloc_multimodal_resources(self, multimodal_params: MultimodalParams, self.tokenizer.init_imageitem_extral_params(img, multimodal_params, sampling_params) data = img.read() # must after init_imageitem_extral_params - tokens_num = self.tokenizer.get_image_token_length(img) + token_num = self.tokenizer.get_image_token_length(img) md5sum = hashlib.md5(data).hexdigest() + "_" + str(hash(frozendict(img.extra_params))) md5sums.append(md5sum) - tokens_nums.append(tokens_num) + tokens_nums.append(token_num) datas.append(data) items.append(img) for audio in multimodal_params.audios: self.tokenizer.init_audioitem_extral_params(audio, multimodal_params, sampling_params) data = audio.read() - tokens_num = self.tokenizer.get_audio_token_length(audio) + token_num = self.tokenizer.get_audio_token_length(audio) md5sum = hashlib.md5(data).hexdigest() + "_" + str(hash(frozendict(audio.extra_params))) md5sums.append(md5sum) - tokens_nums.append(tokens_num) + tokens_nums.append(token_num) datas.append(data) items.append(audio)